Class: AtomicRuby::AtomicThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/atomic-ruby/atomic_thread_pool.rb

Overview

Note:

This class is NOT Ractor-safe as it contains mutable thread state that cannot be safely shared across ractors.

Provides a fixed-size thread pool using atomic operations for work queuing.

AtomicThreadPool maintains a fixed number of worker threads that process work items from an atomic queue. The pool uses compare-and-swap operations for thread-safe work enqueueing and state management.

The queue is implemented as a two-stack structure (in/out) backed by immutable frozen linked-list nodes. Enqueueing prepends to the ‘in` stack in O(1); dequeueing pops from the `out` stack in O(1), reversing `in` into `out` only when `out` is exhausted (amortized O(1) per item).

Examples:

Basic usage

pool = AtomicThreadPool.new(size: 4)
pool << proc { puts "Hello from worker thread!" }
pool << proc { puts "Another work item" }
pool.shutdown

Processing work with results

results = []
pool = AtomicThreadPool.new(size: 2, name: "Calculator")

10.times do |index|
  pool << proc { results << index * 2 }
end

pool.shutdown
puts results.sort #=> [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Monitoring pool state

pool = AtomicThreadPool.new(size: 3)
puts pool.length        #=> 3
puts pool.queue_length  #=> 0
puts pool.active_count  #=> 0

5.times { pool << proc { sleep(1) } }
puts pool.queue_length  #=> 2 (3 workers busy, 2 queued)
puts pool.active_count  #=> 3 (3 workers processing)

Defined Under Namespace

Classes: EnqueuedWorkAfterShutdownError, Error

Instance Method Summary collapse

Constructor Details

#initialize(size:, name: nil, on_error: nil) ⇒ AtomicThreadPool

Creates a new thread pool with the specified size.

Examples:

Create a basic pool

pool = AtomicThreadPool.new(size: 4)

Create a named pool

pool = AtomicThreadPool.new(size: 2, name: "Database Workers")

Create a pool with a custom error handler

errors = []
pool = AtomicThreadPool.new(size: 2, on_error: ->(err) { errors << err })

Parameters:

  • size (Integer)

    The number of worker threads to create (must be positive)

  • name (String, nil) (defaults to: nil)

    Optional name for the thread pool (used in thread names)

  • on_error (Proc, nil) (defaults to: nil)

    Optional error handler called with the exception when a work item raises. Receives the exception as its argument. When nil, errors are printed to stderr

Raises:

  • (ArgumentError)

    if size is not a positive integer

  • (ArgumentError)

    if name is provided but not a string

  • (ArgumentError)

    if on_error is provided but not a Proc



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 79

def initialize(size:, name: nil, on_error: nil)
  raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0
  raise ArgumentError, "name must be a String" unless name.nil? || name.is_a?(String)
  raise ArgumentError, "on_error must be a Proc" unless on_error.nil? || on_error.is_a?(Proc)

  @size = size
  @name = name
  @on_error = on_error

  @state = Atom.new(in: nil, out: nil, count: 0, shutdown: false)
  @started_thread_count = Atom.new(0)
  @active_thread_count = Atom.new(0)
  @threads = []

  start
end

Instance Method Details

#<<(work) ⇒ Object

Enqueues work to be executed by the thread pool.

The work item must respond to #call (typically a Proc or lambda). Work items are executed in FIFO order by available worker threads. If all workers are busy, the work is queued atomically. Enqueueing is O(1) regardless of current queue depth.

Examples:

Enqueue a simple task

pool << proc { puts "Hello World" }

Enqueue a lambda with parameters

calculator = ->(a, b) { puts a + b }
pool << proc { calculator.call(2, 3) }

Enqueue work that captures variables

name = "Alice"
pool << proc { puts "Processing #{name}" }

Parameters:

  • work (#call)

    A callable object to be executed by a worker thread

Raises:



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 119

def <<(work)
  state = @state.swap do |current_state|
    if current_state[:shutdown]
      current_state
    else
      new_node = { value: work, next: current_state[:in] }.freeze
      current_state.merge(in: new_node, count: current_state[:count] + 1)
    end
  end
  raise EnqueuedWorkAfterShutdownError if state[:shutdown]
end

#active_countInteger

Returns the number of worker threads currently executing work.

This represents threads that have picked up a work item and are actively processing it. The count includes threads in the middle of executing work.call, but excludes threads that are idle or waiting for work.

Examples:

Monitor active workers

pool = AtomicThreadPool.new(size: 4)
puts pool.active_count #=> 0

5.times { pool << proc { sleep(1) } }
sleep(0.1) # Give threads time to pick up work
puts pool.active_count #=> 4 (all workers busy)
puts pool.queue_length #=> 1 (one item still queued)

Calculate total load

total_load = pool.active_count + pool.queue_length
puts "Total pending work: #{total_load}"

Returns:

  • (Integer)

    The number of threads actively processing work



199
200
201
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 199

def active_count
  @active_thread_count.value
end

#lengthInteger Also known as: size

Returns the number of currently alive worker threads.

This count decreases as the pool shuts down and threads terminate. During normal operation, this should equal the size parameter passed to the constructor.

Examples:

pool = AtomicThreadPool.new(size: 4)
puts pool.length #=> 4
pool.shutdown
puts pool.length #=> 0

Returns:

  • (Integer)

    The number of alive worker threads



146
147
148
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 146

def length
  @threads.select(&:alive?).length
end

#queue_lengthInteger Also known as: queue_size

Returns the number of work items currently queued for execution.

This represents work that has been enqueued but not yet picked up by a worker thread. A high queue length indicates that work is being submitted faster than it can be processed.

Examples:

pool = AtomicThreadPool.new(size: 2)
5.times { pool << proc { sleep(1) } }
puts pool.queue_length #=> 3 (2 workers busy, 3 queued)

Returns:

  • (Integer)

    The number of queued work items



168
169
170
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 168

def queue_length
  @state.value[:count]
end

#shutdownvoid

This method returns an undefined value.

Gracefully shuts down the thread pool.

This method:

  1. Marks the pool as shutdown (preventing new work from being enqueued)

  2. Waits for all currently queued work to complete

  3. Waits for all worker threads to terminate

After shutdown, all worker threads will be terminated and the pool cannot be restarted. Attempting to enqueue work after shutdown will raise an exception.

Examples:

pool = AtomicThreadPool.new(size: 4)
10.times { |index| pool << proc { puts index } }
pool.shutdown # waits for all work to complete
puts pool.length #=> 0

Raises:



225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 225

def shutdown
  already_shutdown = false
  @state.swap do |current_state|
    if current_state[:shutdown]
      already_shutdown = true
      current_state
    else
      current_state.merge(shutdown: true)
    end
  end
  return if already_shutdown

  @threads.each(&:join)
end