/ Shayon Mukherjee / blog

Fast, Simple and Metered Concurrency in Ruby with Concurrent::Semaphore

May 27, 2024
~3 mins

Let’s say you need to fetch a lot of data from an upstream API, then you want to manipulate that data, maybe even enrich it, and then send it downstream to a database or another API. You aim for high concurrency in fetching data (since it’s allowed), but you need to be cautious when sending the enriched data to the downstream API due to system limits. You can’t send events downstream at the same rate as you fetch them upstream. Additionally, you want to keep the overall system quick, processing, enriching, and emitting each event as soon as possible.

In a traditional Rails/Sidekiq application, you can somewhat manage this by configuring worker sizes on Sidekiq and scaling out horizontally. However, I was curious about handling some operations inline since most are I/O bound, and I wondered if we could use resources optimally in addition to scaling out. I explored various approaches and ended up with a worker pool model using a Semaphore, here they are:

Approach 1: Sequential Processing (The Simple Start)

resources.each do |resource|
  data = fetch_data_from_api(resource)
  event = enrich_data(data)
  update_database(event)
end

This method processes one resource at a time, makes an API call, enriches the data and then updates the database one at a time. It works! However, its slow. So lets keep exploring.

Approach 2: Full Concurrency with Parallel (quick fix)

Parallel.each(resources, in_threads: 100) do |resource|
  data = fetch_data_from_api(resource)
  event = enrich_data(data)
  update_database(event)
end

Having known about Parallel, I was tempted to try it. This method allows both API calls and database updates to run concurrently, aiming to maximize throughput. However, it eventually led to overwhelming the ActiveRecord connection pool, with simultaneous 100 concurrent workloads trying to fetch and update the database at times. While this approach works most of the time and is faster, I am concerned about the risk of exhausting all the connections in the local ActiveRecord connection pool. I was not interested in significantly increasing the pool size either. I want to be able to scale this service horizontally without exhausting the database. That said, we’ve made an interesting discovery—I now know the “upper limit” for this task. It’s determined by the number of connections I can check out from the ActiveRecord pool. As long as I leave enough connections for the rest of the application and still achieve a healthy level of concurrency, I am satisfied with this setup.

Approach 3: Metered Concurrency with Semaphores (the balance)

fetch_pool = Concurrent::ThreadPoolExecutor.new(min_threads: 50, max_threads: 100, max_queue: 10000)
db_semaphore = Concurrent::Semaphore.new(50)

resources.each do |resource|
  fetch_pool.post do
    data = fetch_data_from_api(resource)
    event = enrich_data(data)
    db_pool.post do
      db_semaphore.acquire
      begin
        update_database(event)
      ensure
        db_semaphore.release
      end
    end
  end
end

This setup introduces two levels of concurrency management (via the concurrent-ruby gem): one for fetching data and another for database updates, both controlled by a semaphore. The fetch pool operates with high concurrency, optimizing the time spent on API calls. Meanwhile, the semaphore limits database operations to 50 concurrent updates to protect the application from overwhelming the local ActiveRecord connection pool. This straightforward strategy allows me to balance load and efficiency, reducing total processing time without risking system stability.

While I am sure there are many other methods to address a problem of this nature, these approaches have helped me step back and identify some more simpler techniques to maximize system resources while building a simple, faster, and metered concurrent application in Ruby.

If you are not familiar with the gem, I recommend you check it out: https://github.com/ruby-concurrency/concurrent-ruby

last modified May 27, 2024