Class: AtomicRuby::AtomicThreadPool
- Inherits:
-
Object
- Object
- AtomicRuby::AtomicThreadPool
- Defined in:
- lib/atomic-ruby/atomic_thread_pool.rb
Overview
This class is NOT Ractor-safe as it contains mutable thread state that cannot be safely shared across ractors.
Provides a fixed-size thread pool using atomic operations for work queuing.
AtomicThreadPool maintains a fixed number of worker threads that process work items from an atomic queue. The pool uses compare-and-swap operations for thread-safe work enqueueing and state management.
The queue is implemented as a two-stack structure (in/out) backed by immutable frozen linked-list nodes. Enqueueing prepends to the ‘in` stack in O(1); dequeueing pops from the `out` stack in O(1), reversing `in` into `out` only when `out` is exhausted (amortized O(1) per item).
Defined Under Namespace
Classes: EnqueuedWorkAfterShutdownError, Error
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Enqueues work to be executed by the thread pool.
-
#active_count ⇒ Integer
Returns the number of worker threads currently executing work.
-
#initialize(size:, name: nil, on_error: nil) ⇒ AtomicThreadPool
constructor
Creates a new thread pool with the specified size.
-
#length ⇒ Integer
(also: #size)
Returns the number of currently alive worker threads.
-
#queue_length ⇒ Integer
(also: #queue_size)
Returns the number of work items currently queued for execution.
-
#shutdown ⇒ void
Gracefully shuts down the thread pool.
Constructor Details
#initialize(size:, name: nil, on_error: nil) ⇒ AtomicThreadPool
Creates a new thread pool with the specified size.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 79 def initialize(size:, name: nil, on_error: nil) raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0 raise ArgumentError, "name must be a String" unless name.nil? || name.is_a?(String) raise ArgumentError, "on_error must be a Proc" unless on_error.nil? || on_error.is_a?(Proc) @size = size @name = name @on_error = on_error @state = Atom.new(in: nil, out: nil, count: 0, shutdown: false) @started_thread_count = Atom.new(0) @active_thread_count = Atom.new(0) @threads = [] start end |
Instance Method Details
#<<(work) ⇒ Object
Enqueues work to be executed by the thread pool.
The work item must respond to #call (typically a Proc or lambda). Work items are executed in FIFO order by available worker threads. If all workers are busy, the work is queued atomically. Enqueueing is O(1) regardless of current queue depth.
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 119 def <<(work) state = @state.swap do |current_state| if current_state[:shutdown] current_state else new_node = { value: work, next: current_state[:in] }.freeze current_state.merge(in: new_node, count: current_state[:count] + 1) end end raise EnqueuedWorkAfterShutdownError if state[:shutdown] end |
#active_count ⇒ Integer
Returns the number of worker threads currently executing work.
This represents threads that have picked up a work item and are actively processing it. The count includes threads in the middle of executing work.call, but excludes threads that are idle or waiting for work.
199 200 201 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 199 def active_count @active_thread_count.value end |
#length ⇒ Integer Also known as: size
Returns the number of currently alive worker threads.
This count decreases as the pool shuts down and threads terminate. During normal operation, this should equal the size parameter passed to the constructor.
146 147 148 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 146 def length @threads.select(&:alive?).length end |
#queue_length ⇒ Integer Also known as: queue_size
Returns the number of work items currently queued for execution.
This represents work that has been enqueued but not yet picked up by a worker thread. A high queue length indicates that work is being submitted faster than it can be processed.
168 169 170 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 168 def queue_length @state.value[:count] end |
#shutdown ⇒ void
This method returns an undefined value.
Gracefully shuts down the thread pool.
This method:
-
Marks the pool as shutdown (preventing new work from being enqueued)
-
Waits for all currently queued work to complete
-
Waits for all worker threads to terminate
After shutdown, all worker threads will be terminated and the pool cannot be restarted. Attempting to enqueue work after shutdown will raise an exception.
225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 225 def shutdown already_shutdown = false @state.swap do |current_state| if current_state[:shutdown] already_shutdown = true current_state else current_state.merge(shutdown: true) end end return if already_shutdown @threads.each(&:join) end |