Class: Karafka::Processing::WorkersPool

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/workers_pool.rb

Overview

Dynamic thread pool that manages worker threads. Supports scaling at runtime via #scale.

All public methods that read or mutate ‘@workers` are synchronized via `@mutex`. `@size` is always updated under `@mutex` but can be read without locking for performance (integer assignment is atomic in MRI).

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWorkersPool

Initializes an empty pool with zero workers. Workers are not started until #scale is called, allowing the pool to be created early (e.g. in Server.run) before the jobs queue exists.



33
34
35
36
37
38
39
40
41
42
# File 'lib/karafka/processing/workers_pool.rb', line 33

def initialize
  @jobs_queue = nil
  @workers = []
  @size = 0
  @mutex = Mutex.new
  # Monotonically increasing index for naming worker threads. Indices are never reused
  # after a worker exits, so thread names remain unique across the lifetime of the process
  # and make it easy to correlate log entries with specific worker generations.
  @next_index = 0
end

Instance Attribute Details

#jobs_queue=(value) ⇒ Object (writeonly)

Jobs queue reference, set by the Runner after both pool and queue are created. Must be assigned before calling #scale.



26
27
28
# File 'lib/karafka/processing/workers_pool.rb', line 26

def jobs_queue=(value)
  @jobs_queue = value
end

#sizeInteger (readonly)

Returns current number of workers registered in the pool. Reflects the actual thread count, not a target. After a downscale request this value converges towards the target as workers pick up nil sentinels and deregister. Updated atomically under mutex, safe to read without locking.

Returns:

  • (Integer)

    current number of workers registered in the pool. Reflects the actual thread count, not a target. After a downscale request this value converges towards the target as workers pick up nil sentinels and deregister. Updated atomically under mutex, safe to read without locking.



22
23
24
# File 'lib/karafka/processing/workers_pool.rb', line 22

def size
  @size
end

Instance Method Details

#aliveArray<Worker>

Returns workers that are still alive.

Returns:

  • (Array<Worker>)

    workers that are still alive



87
88
89
# File 'lib/karafka/processing/workers_pool.rb', line 87

def alive
  snapshot.select(&:alive?)
end

#deregister(worker) ⇒ Object

Called by a worker when it exits (queue closed or pool downscaling). Thread-safe – worker threads call this from their own thread.

Parameters:

  • worker (Worker)

    worker to remove from the pool



105
106
107
108
109
110
# File 'lib/karafka/processing/workers_pool.rb', line 105

def deregister(worker)
  @mutex.synchronize do
    @workers.delete(worker)
    @size = @workers.size
  end
end

#joinObject

Wait for all current workers to finish.



97
98
99
# File 'lib/karafka/processing/workers_pool.rb', line 97

def join
  snapshot.each(&:join)
end

#scale(target) ⇒ Object

Scale pool towards ‘target` workers (minimum 1).

**Scaling up** is synchronous – new worker threads are spawned and registered before this method returns. #size reflects the new count immediately.

**Scaling down** is asynchronous – nil sentinels are enqueued and workers exit when they pick one up. #size decreases gradually as workers deregister themselves. Callers that need to know when downsizing is complete should poll #size or listen for the ‘worker.scaling.down` instrumentation event (whose `:to` payload reports the target, not the current count).

The entire read-decide-act cycle is synchronized to prevent stale reads. Instrumentation runs outside the mutex to avoid holding the lock during user callbacks.

Parameters:

  • target (Integer)

    desired number of workers

Raises:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/karafka/processing/workers_pool.rb', line 59

def scale(target)
  raise(Karafka::Errors::BaseError, "jobs_queue must be set before scaling") unless @jobs_queue

  target = [target, 1].max
  event = nil

  @mutex.synchronize do
    current = @workers.size
    delta = target - current

    if delta.positive?
      event = grow(delta)
    elsif delta.negative?
      event = shrink(delta.abs)
    end
  end

  return unless event

  monitor.instrument(*event)
end

#stopped?Boolean

Returns true if all workers have stopped.

Returns:

  • (Boolean)

    true if all workers have stopped



82
83
84
# File 'lib/karafka/processing/workers_pool.rb', line 82

def stopped?
  snapshot.none?(&:alive?)
end

#terminateObject

Forcefully terminate all worker threads.



92
93
94
# File 'lib/karafka/processing/workers_pool.rb', line 92

def terminate
  snapshot.each(&:terminate)
end