Opening The Ruby Concurrency Toolbox

As a Ruby developer you probably use tools like Sidekiq that rely on concurrency. But would you know how to *build* your own sidekiq, or add concurrency to an existing app? This article will open Ruby's concurrency toolbox and show you how each tool works. It shows you how to solve the same problem in multiple ways, so you can compare tools. And it looks at new tools that might possibly ship with future versions of ruby.

Concurrency and parallelism are more important than ever for Ruby developers. They can make our applications faster, utilizing the hardware that powers them to its fullest potential. In this article, we are going to explore the tools currently available to every Rubyist and also what Ruby promises to soon deliver in this department.

Not everyone uses concurrency directly, but we all use it indirectly via tools like Sidekiq. Understanding Ruby concurrency won't just help you build your own solutions; it will help you understand and troubleshoot existing ones.

But first let's take a step back and look at the big picture.

Concurrency vs. Parallelism

These terms are used loosely, but they do have distinct meanings.

  • Concurrency: The art of doing many tasks, one at a time. By switching between them quickly, it may appear to the user as though they happen simultaneously.
  • Parallelism: Doing many tasks at literally the same time. Instead of appearing simultaneous, they are simultaneous.

Concurrency is most often used for applications that are IO heavy. For example, a web app may regularly interact with a database or make lots of network requests. By using concurrency, we can keep our application responsive, even while we wait for the database to respond to our query.

This is possible because the Ruby VM allows other threads to run while one is waiting during IO. Even if a program has to make dozens of requests, if we use concurrency, the requests will be made at virtually the same time.

Parallelism, on the other hand, is not currently supported by Ruby.

Why No Parallelism in Ruby?

Today, there is no way of achieving parallelism within a single Ruby process using the default Ruby implementation (generally called MRI or CRuby). The Ruby VM enforces a lock (the GVM, or Global VM Lock) that prevents multiple threads from running Ruby code at the same time. This lock exists to protect the internal state of the virtual machine and to prevent scenarios that could result in the VM crashing. This is not a great spot to be in, but all hope is not lost: Ruby 3 is coming soon and it promises to solve this handicap by introducing a concept codenamed Guild (explained in the last sections of this article).

Threads

Threads are Ruby's concurrency workhorse. To better understand how to use them and what pitfalls to be aware of, we're going to give an example. We'll build a little program that consumes an API and stores its results in a datastore using concurrency.

Before we build the API client, we need an API. Below is the implementation of a tiny API that accepts a number and responds as plain text if the number provided is even odd. If the syntax looks strange to you, don't worry. This doesn't have anything to do with concurrency. It's just a tool we'll use.

app =
  Proc.new do |env|
    sleep 0.05
    qs = env['QUERY_STRING']
    number = Integer(qs.match(/number=(\d+)/)[1])
    [
      '200',
      { 'Content-Type' => 'text/plain' },
      [number.even? ? 'even' : 'odd']
    ]
  end

run app

To run this web app you'll need to have the rack gem installed, then execute rackup config.ru.

We also need a mock datastore. Here's a class that simulates a key-value database:

class Datastore
  # ... accessors and initialization omitted ...
  def read(key)
    data[key]
  end

  def write(key, value)
    data[key] = value
  end
end

Now, let's go through the implementation of our concurrent solution. We have a method, run, which concurrently fetches 1,000 records and stores them in our datastore.

class ThreadPoweredIntegration
  # ... accessors and initialization ...
  def run
    threads = []
    (1..1000).each_slice(250) do |subset|
      threads << Thread.new do
        subset.each do |number|
          uri = 'http://localhost:9292/' \
            "even_or_odd?number=#{number}"
          status, body = AdHocHTTP.new(uri).blocking_get
          handle_response(status, body)
        rescue Errno::ETIMEDOUT
          retry # Try again if the server times out.
        end
      end
    end
    threads.each(&:join)
  end
  # ...
end

We create four threads, each processing 250 records. We use this strategy in order not to overwhelm the third-party API or our own systems.

By having the requests being made concurrently using multiple threads, the total execution will take a fraction of the time that a sequential implementation would take. While each thread has moments of inactivity during all the steps necessary to establish and communicate through an HTTP request, the Ruby VM allows a different thread to start running. This is the reason why this implementation is much faster than a sequential one.

The AdHocHTTP class is a straightforward HTTP client implemented specially for this article to allow us to focus only on the differences between code powered by threads and code powered by fibers. It's beyond the scope of this article to discuss its implementation, but you can check it out here if you're curious.

Finally, we handle the server's response by the end of the inner loop. Here's how the method handle_response looks:

# ... inside the ThreadPoweredIntegration class ...

attr_reader :ds

def initialize
  @ds = Datastore.new(even: 0, odd: 0)
end

# ...

def handle_response(status, body)
  return if status != '200'
  key = body.to_sym
  curr_count = ds.read(key)
  ds.write(key, curr_count + 1)
end

This method looks all right, doesn't it? Let's run it and see what ends up at our datastore:

{ even: 497, odd: 489 }

This is pretty strange, as I'm sure that between 1 and 1000 there are 500 even numbers and 500 odd ones. In the next section, let's understand what's happening and briefly explore one of the ways to solve this bug.

Threads and Data Races: The Devil Is in the Details

Using threads allows our IO heavy programs to run much faster, but they're also tough to get right. The error in our results above is caused by a race condition in the handle_response method. A race condition happens when two threads manipulate the same data.

Since we're operating on a shared resource (the ds datastore object), we have to be especially careful with non-atomic operations. Notice that we first read from the datastore and--in a second statement--we write to it the count incremented by 1. This is problematic because our thread may stop running after the read but before the write. Then, if another thread runs and increments the value of the key we're interested in, we'll write an out-of-date count when the original thread resumes.

One way to mitigate the dangers of using threads is to use higher-level abstractions to structure a concurrent implementation. Check out the concurrent-ruby gem for different patterns to use and a safer thread-powered program.

There are many ways to fix a data race. A simple solution is to use a mutex. This synchronization mechanism enforces one-at-a-time access to a given segment of code. Here's our previous implementation fixed by the usage of a mutex:

# ... inside ThreadPoweredIntegration class ...
def initialize
  # ...
  @semaphore = Mutex.new
end
# ...
def handle_response(status, body)
  return if status != '200'
  key = body.to_sym
  semaphore.synchronize do
    curr_count = ds.read(key)
    ds.write(key, curr_count + 1)
  end
end

If you plan to use threads inside a Rails application, the official guide Threading and Code Execution in Rails is a must-read. Failing to follow these guidelines may result in very unpleasant consequences, like leaking database connections.

After running our corrected implementation, we get the expected result:

{ even: 500, odd: 500 }

Instead of using a mutex, we can also get rid of data races by dropping threads altogether and reaching for another concurrency tool available in Ruby. In the next section, we're going to take a look at Fiber as a mechanism for improving the performance of IO-heavy apps.

Fiber: A Slender Tool for Concurrency

Ruby Fibers let you achieve cooperative concurrency within a single thread. This means that fibers are not preempted and the program itself must do the scheduling. Because the programmer controls when fibers start and stop, it is much easier to avoid race conditions.

Unlike threads, fibers do not grant us better performance when IO happens. Fortunately, Ruby provides asynchronous reads and writes through its IO class. By using these async methods we can prevent IO operations from blocking our fiber-based code.

Same Scenario, Now with Fibers

Let's go through the same example, but now using fibers combined with the async capabilities of Ruby's IO class. It's beyond the scope of this article to explain all the details of async IO in Ruby. Still, we'll touch on the essential parts of its workings and you can take a look at the implementation of the relevant methods of AdHocHTTP (the same client appearing in the threaded solution we've just explored) if you're curious.

We'll start by looking at the run method of our fiber-powered implementation:

class FiberPoweredIntegration
  # ... accessors and initialization ...
  def run
    (1..1000).each_slice(250) do |subset|
      Fiber.new do
        subset.each do |number|
          uri = 'http://127.0.0.1:9292/' \
            "even_or_odd?number=#{number}"
          client = AdHocHTTP.new(uri)
          socket = client.init_non_blocking_get
          yield_if_waiting(client,
                           socket,
                           :connect_non_blocking_get)
          yield_if_waiting(client,
                           socket,
                           :write_non_blocking_get)
          status, body =
            yield_if_waiting(client,
                             socket,
                             :read_non_blocking_get)
          handle_response(status, body)
        ensure
          client&.close_non_blocking_get
        end
      end.resume
    end

    wait_all_requests
  end
  # ...
end

We first create a fiber for each subset of the numbers we want to check if even or odd.

Then we loop over the numbers, calling yield_if_waiting. This method is responsible for stopping the current fiber and allowing another one to resume.

Notice also that after creating a fiber, we call resume. This causes the fiber to start running. By calling resume immediately after creation, we start making HTTP requests even before the main loop going from 1 to 1000 finishes.

At the end of the run method, there's a call to wait_all_requests. This method selects fibers that are ready to run and also guarantees we make all the intended requests. We'll take a look at it in the last segment of this section.

Now, let's see yield_if_waiting in detail:

# ... inside FiberPoweredIntegration ...
def initialize
  @ds = Datastore.new(even: 0, odd: 0)
  @waiting = { wait_readable: {}, wait_writable: {} }
end
# ...
def yield_if_waiting(client, socket, operation)
  res_or_status = client.send(operation)
  is_waiting =
    [:wait_readable,
     :wait_writable].include?(res_or_status)
  return res_or_status unless is_waiting

  waiting[res_or_status][socket] = Fiber.current
  Fiber.yield
  waiting[res_or_status].delete(socket)
  yield_if_waiting(client, socket, operation)
rescue Errno::ETIMEDOUT
  retry # Try again if the server times out.
end

We first try to perform an operation (connect, read, or write) using our client. Two primary outcomes are possible:

  • Success: When that happens, we return.
  • We can receive a symbol: This means we have to wait.

How does one "wait"?

  1. We create a kind of checkpoint by adding our socket combined with the current fiber to the instance variable waiting (which is a Hash).
  2. We store this pair inside a collection that holds IO waiting for reading or writing (we'll see why that's important in a moment), depending on the result we get back from the client.
  3. We stop the execution of the current fiber, allowing another one to run. The paused fiber will get the opportunity to resume work at some point after the associated network socket becomes ready. Then, the IO operation will be retried (and this time will succeed).

Every Ruby program runs inside a fiber that itself is part of a thread (everything inside a process). As a consequence, when we create a first fiber, run it, and then at some point yield, we're resuming the execution of the central part of the program.

Now that we understand the mechanism used to yield execution when a fiber is waiting IO, let's explore the last bit needed to comprehend this fiber-powered implementation.

def wait_all_requests
  while(waiting[:wait_readable].any? ||
        waiting[:wait_writable].any?)

    ready_to_read, ready_to_write =
      IO.select(waiting[:wait_readable].keys,
                waiting[:wait_writable].keys)

    ready_to_read.each do |socket|
      waiting[:wait_readable][socket].resume
    end

    ready_to_write.each do |socket|
      waiting[:wait_writable][socket].resume
    end
  end
end

The chief idea here is to wait (in other words, to loop) until all pending IO operations are complete.

To do that, we use IO.select. It accepts two collections of pending IO objects: one for reading and one for writing. It returns those IO objects that have finished their job. Because we associated these IO objects with the fibers responsible for running them, it's simple to resume those fibers.

We keep on repeating these steps until all requests are fired and completed.

The Grand Finale: Comparable Performance, No Need for Locks

Our handle_response method is exactly the same as that initially used in the code using threads (the version without a mutex). However, since all our fibers run inside the same thread, we won't have any data races. When we run our code, we get the expected result:

{ even: 500, odd: 500 }

You probably don't want to deal with all that fiber switching business every time you leverage async IO. Fortunately, some gems abstract all this work and make the usage of fibers something the developer doesn't need to think about. Check out the async project as a great start.

Fibers Shine When High Scalability Is a Must

Although we can reap the benefits of virtually eliminating the risks of data races even in small scale scenarios, fibers are a great tool when high scalability is needed. Fibers are much more lightweight than threads. Given the same available resources, creating threads will overwhelm a system much sooner than fibers. For an excellent exploration on the topic, we recommend the presentation The Journey to One Million by Ruby Core Team's Samuel Williams.

Guild - Parallel Programming in Ruby

So far we've seen two useful tools for concurrency in Ruby. Neither of them, however, can improve the performance of pure computations. For that you would need true parallelism, which doesn't currently exist in Ruby (here we're considering MRI, the default implementation).

This may be changing in Ruby 3 with the coming of a new feature called "Guilds." Details are still hazy, but in the following sections we'll take a look at how this work-in-progress feature promises to allow parallelism in Ruby.

How Guilds Might Work

A significant source of pain when implementing concurrent/parallel solutions is shared memory. In the section on threads, we already saw how easy it is to make a slip and write code that may seem innocuous at first glance but actually contains subtle bugs.

Koichi Sasada--the Ruby Core Team member heading the development of the new Guild feature--is hard at work designing a solution that tackles head on the dangers of sharing memory among multiple threads. In his presentation at the 2018 RubyConf, he explains that when using guilds one won't be able to simply share mutable objects. The main idea is to prevent data races by only allowing immutable objects to be shared between different guilds.

Specialized data structures will be introduced in Ruby to allow some measure of shared memory between guilds, but the details of how exactly this is going to work are still not fully fleshed out. There will also be an API that will allow objects to be copied or moved between guilds, plus a safeguard to impede an object from being referenced after it's been moved to a different guild.

Using Guilds to Explore a Common Scenario

There are many situations where you might wish you could speed up computations by running them in parallel. Let's imagine that we have to calculate the average and mean of the same dataset.

The example below shows how we might do this with guilds. Keep in mind that this code doesn't currently work and might never work, even after guilds are released.

# A frozen array of numeric values is an immutable object.
dataset = [88, 43, 37, 85, 84, 38, 13, 84, 17, 87].freeze
# The overhead of using guilds will probably be
# considerable, so it will only make sense to
# parallelize work when a dataset is large / when
# performing lots of operations.

g1 = Guild.new do
  mean = dataset.reduce(:+).fdiv(dataset.length)
  Guild.send_to(:mean, Guild.parent)
end

g2 = Guild.new do
  median = Median.calculate(dataset.sort)
  Guild.send_to(:median, Guild.parent)
end

results = {}
# Every Ruby program will be run inside a main guild;
# therefore, we can also receive messages in the main
# section of our program.
Guild.receive(:mean, :median) do |tag, result|
  results[tag] = result
end

Summing It Up

Concurrency and parallelism are not the main strengths of Ruby, but even in this department the language does offer tools that are probably good enough to deal with most use cases. Ruby 3 is coming and it seems things will get considerably better with the introduction of the Guild primitive. In my opinion, Ruby is still a very suitable choice in many situations, and its community is clearly hard at work making the language even better. Let's keep an ear to the ground for what's coming!

author photo

Alex Braha Stoll

Alex is a software developer that cannot get tired of attempting to write the next line of code at least a little better than the one before it. In his free time, he likes to study and practice Photography.


“We’ve looked at a lot of error management systems. Honeybadger is head and shoulders above the rest and somehow gets better with every new release.”
Michael Smith
Try Error Monitoring Free for 15 Days
Are you using Bugsnag, Rollbar, or Airbrake for your monitoring? Honeybadger includes exception, uptime, and check-in monitoring — all for probably less than you’re paying now. Discover why so many companies are switching to Honeybadger here.
Try Error Monitoring Free for 15 Days
Stop digging through chat logs to find the bug-fix someone mentioned last month. Honeybadger's built-in issue tracker keeps discussion central to each error, so that if it pops up again you'll be able to pick up right where you left off.
Try Error Monitoring Free for 15 Days
"Wow — Customers are blown away that I email them so quickly after an error."
Chris Patton
Try Error Monitoring Free for 15 Days