Class: Zizq::Worker
- Inherits:
-
Object
- Object
- Zizq::Worker
- Defined in:
- lib/zizq/worker.rb
Overview
Top-level worker process which orchestrates fetching jobs from the server and dispatching them to a pool of worker tasks for processing.
Fiber support (when ‘fiber_count > 1`) creates an Async context. When `fiber_count == 1`, no Async context is created.
Total concurrency is calculated as ‘thread_count * fiber_count`.
Constant Summary collapse
- DEFAULT_THREADS =
: Integer
5- DEFAULT_FIBERS =
: Integer
1- DEFAULT_RETRY_MIN_WAIT =
1- DEFAULT_RETRY_MAX_WAIT =
30- DEFAULT_RETRY_MULTIPLIER =
2
Instance Attribute Summary collapse
-
#dispatcher ⇒ Object
readonly
The dispatcher used to handle each job.
-
#fiber_count ⇒ Object
readonly
The total number of fibers to run within each worker thread.
-
#logger ⇒ Object
readonly
An instance of a Logger to be used for worker logging.
-
#prefetch ⇒ Object
readonly
The total number of jobs to allow to be sent from the server at once.
-
#queues ⇒ Object
readonly
The set of queues from which to fetch jobs.
-
#thread_count ⇒ Object
readonly
The total number of worker threads to run.
-
#worker_id_proc ⇒ Object
readonly
Proc to derive a worker ID string for each thread and fiber.
Class Method Summary collapse
-
.run ⇒ Object
Convenience class method to create and run a worker.
Instance Method Summary collapse
-
#initialize(queues: [], thread_count: DEFAULT_THREADS, fiber_count: DEFAULT_FIBERS, prefetch: nil, retry_min_wait: DEFAULT_RETRY_MIN_WAIT, retry_max_wait: DEFAULT_RETRY_MAX_WAIT, retry_multiplier: DEFAULT_RETRY_MULTIPLIER, worker_id: nil, logger: nil, dispatcher: nil) ⇒ Worker
constructor
A new instance of Worker.
-
#kill ⇒ Object
Request an immediate shutdown.
-
#run ⇒ Object
Start the worker.
-
#stop ⇒ Object
Request a graceful shutdown.
Constructor Details
#initialize(queues: [], thread_count: DEFAULT_THREADS, fiber_count: DEFAULT_FIBERS, prefetch: nil, retry_min_wait: DEFAULT_RETRY_MIN_WAIT, retry_max_wait: DEFAULT_RETRY_MAX_WAIT, retry_multiplier: DEFAULT_RETRY_MULTIPLIER, worker_id: nil, logger: nil, dispatcher: nil) ⇒ Worker
Returns a new instance of Worker.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/zizq/worker.rb', line 80 def initialize( queues: [], thread_count: DEFAULT_THREADS, fiber_count: DEFAULT_FIBERS, prefetch: nil, retry_min_wait: DEFAULT_RETRY_MIN_WAIT, retry_max_wait: DEFAULT_RETRY_MAX_WAIT, retry_multiplier: DEFAULT_RETRY_MULTIPLIER, worker_id: nil, logger: nil, dispatcher: nil ) raise ArgumentError, "thread_count must be at least 1 (got #{thread_count})" if thread_count < 1 raise ArgumentError, "fiber_count must be at least 1 (got #{fiber_count})" if fiber_count < 1 Zizq.configuration.validate! @queues = queues @thread_count = thread_count @fiber_count = fiber_count @prefetch = prefetch || thread_count * fiber_count * 2 @retry_min_wait = retry_min_wait @retry_max_wait = retry_max_wait @retry_multiplier = retry_multiplier @worker_id_proc = worker_id @logger = logger || Zizq.configuration.logger @dispatcher = dispatcher || Zizq.configuration.dequeue_middleware reset_runtime_state end |
Instance Attribute Details
#dispatcher ⇒ Object (readonly)
The dispatcher used to handle each job.
Defaults to the globally-configured ‘dequeue_middleware` chain. When a custom dispatcher is provided to `#initialize`, it is used as-is and the configured middleware chain is ignored. Caller may construct their own `Zizq::Middleware::Chain` if middleware needs to be applied.
67 68 69 |
# File 'lib/zizq/worker.rb', line 67 def dispatcher @dispatcher end |
#fiber_count ⇒ Object (readonly)
The total number of fibers to run within each worker thread.
For applications that cannot handle multi-fiber execution, this should be set to 1. Any value greater than 1 runs workers inside an Async context (default: 1).
40 41 42 |
# File 'lib/zizq/worker.rb', line 40 def fiber_count @fiber_count end |
#logger ⇒ Object (readonly)
An instance of a Logger to be used for worker logging.
59 60 61 |
# File 'lib/zizq/worker.rb', line 59 def logger @logger end |
#prefetch ⇒ Object (readonly)
The total number of jobs to allow to be sent from the server at once.
Defaults to 2x the total concurrency (threads * fibers) to keep the pipeline full while ack round-trips are in flight.
51 52 53 |
# File 'lib/zizq/worker.rb', line 51 def prefetch @prefetch end |
#queues ⇒ Object (readonly)
The set of queues from which to fetch jobs.
An empty set (default) means all queues.
45 46 47 |
# File 'lib/zizq/worker.rb', line 45 def queues @queues end |
#thread_count ⇒ Object (readonly)
The total number of worker threads to run.
For applications that are not threadsafe, this should be set to 1 (default: 5).
33 34 35 |
# File 'lib/zizq/worker.rb', line 33 def thread_count @thread_count end |
#worker_id_proc ⇒ Object (readonly)
Proc to derive a worker ID string for each thread and fiber.
When not present, the Zizq server assigns a random worker ID.
56 57 58 |
# File 'lib/zizq/worker.rb', line 56 def worker_id_proc @worker_id_proc end |
Class Method Details
.run ⇒ Object
Convenience class method to create and run a worker.
25 26 27 |
# File 'lib/zizq/worker.rb', line 25 def self.run(...) #: (**untyped) -> void new(...).run end |
Instance Method Details
#kill ⇒ Object
Request an immediate shutdown.
Like ‘#stop`, but the streaming connection is closed immediately during teardown (rather than after workers drain), so the server re-dispatches any in-flight jobs after its visibility timeout. Use this when `#stop` has been given adequate time and still hasn’t returned.
In-progress jobs on worker threads continue to completion — we don’t interrupt user code mid-execution — but no new jobs are pulled from the queue and cleanup uses short deadlines.
Safe to call from a signal handler.
139 140 141 142 143 |
# File 'lib/zizq/worker.rb', line 139 def kill #: () -> void @killing = true @lifecycle.drain! @dispatch_queue.close rescue nil end |
#run ⇒ Object
Start the worker.
Spawns the desired number of worker threads and fibers, distributes jobs to those workers and then blocks until shutdown. Safe to call multiple times on the same Worker instance — all mutable runtime state (lifecycle, dispatch queue, ack processor, backoff) is reset at the start of each run.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/zizq/worker.rb', line 152 def run #: () -> void reset_runtime_state logger.info do format( "Zizq worker starting: %d threads, %d fibers, prefetch=%d", thread_count, fiber_count, prefetch, ) end logger.info { "Queues: #{queues.empty? ? '(all)' : queues.join(', ')}" } # Everything runs in the background initially. @ack_processor.start worker_threads = start_worker_threads producer_thread = start_producer_thread # Block until the lifecycle leaves :running (stop, kill, or crash). @lifecycle.wait_while_running if @killing logger.info { "Killing. Closing stream and forcing shutdown..." } # Close the streaming response immediately so the server # re-dispatches any in-flight jobs after its visibility timeout. # This also unblocks the producer's IO read. @streaming_response&.close rescue nil # Workers will finish their current job (can't be interrupted) # and then see the closed dispatch queue and exit. worker_threads.each(&:join) # Drain whatever acks happen to flush before their fibers exit. # No timeout — workers finish their current job and exit quickly. @ack_processor.stop else logger.info { "Shutting down. Waiting for workers to finish..." } # Workers drain remaining jobs from the closed dispatch queue. # The producer stays connected so in-flight jobs aren't requeued # by the server while workers are still finishing them. worker_threads.each(&:join) # Drain pending acks/nacks while the connection is still open. @ack_processor.stop # Close the streaming response to unblock the producer's IO read. # This happens after workers and acks have drained so the server # doesn't requeue in-flight jobs while workers are still finishing. @streaming_response&.close rescue nil end # Signal the producer that cleanup is complete. The watcher fiber # inside the producer's Sync block wakes up on this and cancels # the producer's main task, so the stream is closed from its own # reactor rather than via a cross-thread close. @lifecycle.stop! producer_thread.join logger.info { "Zizq worker stopped" } end |
#stop ⇒ Object
Request a graceful shutdown.
Transitions the lifecycle to ‘:draining` and closes the dispatch queue. Worker threads finish any in-flight jobs, the ack processor flushes pending acks, and the producer stays connected to the server while all of that drains — only then is the streaming connection closed and `#run` returns.
Safe to call from a signal handler (uses only atomic ivar assignment and ‘Thread::Queue#close`).
121 122 123 124 |
# File 'lib/zizq/worker.rb', line 121 def stop #: () -> void @lifecycle.drain! @dispatch_queue.close rescue nil end |