Class: Karafka::Processing::WorkersPool
- Inherits:
-
Object
- Object
- Karafka::Processing::WorkersPool
- Defined in:
- lib/karafka/processing/workers_pool.rb
Overview
Dynamic thread pool that manages worker threads. Supports scaling at runtime via #scale.
All public methods that read or mutate ‘@workers` are synchronized via `@mutex`. `@size` is always updated under `@mutex` but can be read without locking for performance (integer assignment is atomic in MRI).
Instance Attribute Summary collapse
-
#jobs_queue ⇒ Object
writeonly
Jobs queue reference, set by the Runner after both pool and queue are created.
-
#size ⇒ Integer
readonly
Current number of workers registered in the pool.
Instance Method Summary collapse
-
#alive ⇒ Array<Worker>
Workers that are still alive.
-
#deregister(worker) ⇒ Object
Called by a worker when it exits (queue closed or pool downscaling).
-
#initialize ⇒ WorkersPool
constructor
Initializes an empty pool with zero workers.
-
#join ⇒ Object
Wait for all current workers to finish.
-
#scale(target) ⇒ Object
Scale pool towards ‘target` workers (minimum 1).
-
#stopped? ⇒ Boolean
True if all workers have stopped.
-
#terminate ⇒ Object
Forcefully terminate all worker threads.
Constructor Details
#initialize ⇒ WorkersPool
Initializes an empty pool with zero workers. Workers are not started until #scale is called, allowing the pool to be created early (e.g. in Server.run) before the jobs queue exists.
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/karafka/processing/workers_pool.rb', line 33 def initialize @jobs_queue = nil @workers = [] @size = 0 @mutex = Mutex.new # Monotonically increasing index for naming worker threads. Indices are never reused # after a worker exits, so thread names remain unique across the lifetime of the process # and make it easy to correlate log entries with specific worker generations. @next_index = 0 end |
Instance Attribute Details
#jobs_queue=(value) ⇒ Object (writeonly)
Jobs queue reference, set by the Runner after both pool and queue are created. Must be assigned before calling #scale.
26 27 28 |
# File 'lib/karafka/processing/workers_pool.rb', line 26 def jobs_queue=(value) @jobs_queue = value end |
#size ⇒ Integer (readonly)
Returns current number of workers registered in the pool. Reflects the actual thread count, not a target. After a downscale request this value converges towards the target as workers pick up nil sentinels and deregister. Updated atomically under mutex, safe to read without locking.
22 23 24 |
# File 'lib/karafka/processing/workers_pool.rb', line 22 def size @size end |
Instance Method Details
#alive ⇒ Array<Worker>
Returns workers that are still alive.
87 88 89 |
# File 'lib/karafka/processing/workers_pool.rb', line 87 def alive snapshot.select(&:alive?) end |
#deregister(worker) ⇒ Object
Called by a worker when it exits (queue closed or pool downscaling). Thread-safe – worker threads call this from their own thread.
105 106 107 108 109 110 |
# File 'lib/karafka/processing/workers_pool.rb', line 105 def deregister(worker) @mutex.synchronize do @workers.delete(worker) @size = @workers.size end end |
#join ⇒ Object
Wait for all current workers to finish.
97 98 99 |
# File 'lib/karafka/processing/workers_pool.rb', line 97 def join snapshot.each(&:join) end |
#scale(target) ⇒ Object
Scale pool towards ‘target` workers (minimum 1).
**Scaling up** is synchronous – new worker threads are spawned and registered before this method returns. #size reflects the new count immediately.
**Scaling down** is asynchronous – nil sentinels are enqueued and workers exit when they pick one up. #size decreases gradually as workers deregister themselves. Callers that need to know when downsizing is complete should poll #size or listen for the ‘worker.scaling.down` instrumentation event (whose `:to` payload reports the target, not the current count).
The entire read-decide-act cycle is synchronized to prevent stale reads. Instrumentation runs outside the mutex to avoid holding the lock during user callbacks.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/karafka/processing/workers_pool.rb', line 59 def scale(target) raise(Karafka::Errors::BaseError, "jobs_queue must be set before scaling") unless @jobs_queue target = [target, 1].max event = nil @mutex.synchronize do current = @workers.size delta = target - current if delta.positive? event = grow(delta) elsif delta.negative? event = shrink(delta.abs) end end return unless event monitor.instrument(*event) end |
#stopped? ⇒ Boolean
Returns true if all workers have stopped.
82 83 84 |
# File 'lib/karafka/processing/workers_pool.rb', line 82 def stopped? snapshot.none?(&:alive?) end |
#terminate ⇒ Object
Forcefully terminate all worker threads.
92 93 94 |
# File 'lib/karafka/processing/workers_pool.rb', line 92 def terminate snapshot.each(&:terminate) end |