Class: Zizq::Worker
- Inherits:
-
Object
- Object
- Zizq::Worker
- Defined in:
- lib/zizq/worker.rb,
sig/generated/zizq/worker.rbs
Overview
Top-level worker process which orchestrates fetching jobs from the server and dispatching them to a pool of worker tasks for processing.
Fiber support (when fiber_count > 1) creates an Async context. When
fiber_count == 1, no Async context is created.
Total concurrency is calculated as thread_count * fiber_count.
Constant Summary collapse
- DEFAULT_THREADS =
: Integer
1- DEFAULT_FIBERS =
: Integer
1- DEFAULT_RETRY_MIN_WAIT =
1- DEFAULT_RETRY_MAX_WAIT =
30- DEFAULT_RETRY_MULTIPLIER =
2
Instance Attribute Summary collapse
-
#dispatcher ⇒ ^(Resources::Job) -> void
readonly
The dispatcher used to handle each job.
-
#fiber_count ⇒ Integer
readonly
The total number of fibers to run within each worker thread.
-
#logger ⇒ Logger
readonly
An instance of a Logger to be used for worker logging.
-
#prefetch ⇒ Integer
readonly
The total number of jobs to allow to be sent from the server at once.
-
#queues ⇒ Array[String]
readonly
The set of queues from which to fetch jobs.
-
#thread_count ⇒ Integer
readonly
The total number of worker threads to run.
Class Method Summary collapse
-
.run ⇒ Object
Convenience class method to create and run a worker.
Instance Method Summary collapse
-
#dispatch(job) ⇒ Object
Process a single job.
-
#initialize(queues: nil, thread_count: nil, fiber_count: nil, prefetch: nil, retry_min_wait: nil, retry_max_wait: nil, retry_multiplier: nil, logger: nil, dispatcher: nil) ⇒ Worker
constructor
All keyword arguments default to
niland follow a three-level fallback chain:. -
#kill ⇒ Object
Request an immediate shutdown.
- #push_ack(job_id) ⇒ void
- #push_nack(job_id, error) ⇒ void
-
#reset_runtime_state ⇒ Object
Reset all mutable runtime state so
#runcan be called multiple times on the same Worker instance. -
#run ⇒ Object
Start the worker.
-
#run_fiber_workers(thread_idx) ⇒ Object
Fiber-based worker loop.
-
#run_loop(thread_idx, fiber_idx) ⇒ Object
Internal worker run loop.
-
#start_producer_thread ⇒ Object
: () -> Thread.
-
#start_worker_threads ⇒ Object
: () -> Array.
-
#stop ⇒ Object
Request a graceful shutdown.
Constructor Details
#initialize(queues: nil, thread_count: nil, fiber_count: nil, prefetch: nil, retry_min_wait: nil, retry_max_wait: nil, retry_multiplier: nil, logger: nil, dispatcher: nil) ⇒ Worker
All keyword arguments default to nil and follow a three-level
fallback chain:
1. Explicit kwarg passed to `Worker.new`.
2. `Zizq.configuration.worker.<field>` set in the app's
`Zizq.configure` block.
3. The Worker's hardcoded `DEFAULT_*` constants.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/zizq/worker.rb', line 82 def initialize( queues: nil, thread_count: nil, fiber_count: nil, prefetch: nil, retry_min_wait: nil, retry_max_wait: nil, retry_multiplier: nil, logger: nil, dispatcher: nil ) Zizq.configuration.validate! config = Zizq.configuration.worker @queues = queues || config.queues || [] @thread_count = thread_count || config.thread_count || DEFAULT_THREADS @fiber_count = fiber_count || config.fiber_count || DEFAULT_FIBERS @prefetch = prefetch || config.prefetch || @thread_count * @fiber_count * 2 @retry_min_wait = retry_min_wait || config.retry_min_wait || DEFAULT_RETRY_MIN_WAIT @retry_max_wait = retry_max_wait || config.retry_max_wait || DEFAULT_RETRY_MAX_WAIT @retry_multiplier = retry_multiplier || config.retry_multiplier || DEFAULT_RETRY_MULTIPLIER @logger = logger || Zizq.configuration.logger @dispatcher = dispatcher || Zizq.configuration.dequeue_middleware if @thread_count < 1 raise ArgumentError, "thread_count must be at least 1 (got #{@thread_count})" end if @fiber_count < 1 raise ArgumentError, "fiber_count must be at least 1 (got #{@fiber_count})" end reset_runtime_state end |
Instance Attribute Details
#dispatcher ⇒ ^(Resources::Job) -> void (readonly)
The dispatcher used to handle each job.
Defaults to the globally-configured dequeue_middleware chain.
When a custom dispatcher is provided to #initialize, it is used as-is
and the configured middleware chain is ignored. Caller may construct
their own Zizq::Middleware::Chain if middleware needs to be applied.
62 63 64 |
# File 'lib/zizq/worker.rb', line 62 def dispatcher @dispatcher end |
#fiber_count ⇒ Integer (readonly)
The total number of fibers to run within each worker thread.
For applications that cannot handle multi-fiber execution, this should be set to 1. Any value greater than 1 runs workers inside an Async context (default: 1).
40 41 42 |
# File 'lib/zizq/worker.rb', line 40 def fiber_count @fiber_count end |
#logger ⇒ Logger (readonly)
An instance of a Logger to be used for worker logging.
54 55 56 |
# File 'lib/zizq/worker.rb', line 54 def logger @logger end |
#prefetch ⇒ Integer (readonly)
The total number of jobs to allow to be sent from the server at once.
Defaults to 2x the total concurrency (threads * fibers) to keep the pipeline full while ack round-trips are in flight.
51 52 53 |
# File 'lib/zizq/worker.rb', line 51 def prefetch @prefetch end |
#queues ⇒ Array[String] (readonly)
The set of queues from which to fetch jobs.
An empty set (default) means all queues.
45 46 47 |
# File 'lib/zizq/worker.rb', line 45 def queues @queues end |
#thread_count ⇒ Integer (readonly)
The total number of worker threads to run.
For applications that are not threadsafe, this should be set to 1 (default: 5).
33 34 35 |
# File 'lib/zizq/worker.rb', line 33 def thread_count @thread_count end |
Class Method Details
.run ⇒ Object
Convenience class method to create and run a worker.
25 26 27 |
# File 'lib/zizq/worker.rb', line 25 def self.run(...) #: (**untyped) -> void new(...).run end |
Instance Method Details
#dispatch(job) ⇒ Object
Process a single job.
Delegates to the configured dispatcher (default: Zizq::Job.dispatch)
and reports success or failure.
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 |
# File 'lib/zizq/worker.rb', line 398 def dispatch(job) #: (Resources::Job) -> void job_id, job_type = job.id, job.type begin start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) begin @dispatcher.call(job) ensure finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) elapsed_time = finish_time - start_time end rescue Exception => error raise if !@lifecycle.running? && error.is_a?(Async::Stop) logger.error do format( "Job %s (%s) failed in %.4fs: %s: %s", job_type, job_id, elapsed_time, error.class, error. ) end push_nack(job_id, error) return end push_ack(job_id) logger.debug do format( "Job %s (%s) completed in %.4fs", job_type, job_id, elapsed_time ) end rescue Async::Stop, ClosedQueueError # In the case jobs take too long to terminate, they are force killed # which produces errors as they attempt to ack/nack etc. # # This means those jobs terminate without finishing their work but the # Zizq backend automatically returns them to the queue when the client # disconnects, so they'll be received by another worker when one connects. logger.debug { "Job #{job_type} (#{job_id}) interrupted during shutdown" } end |
#kill ⇒ Object
Request an immediate shutdown.
Like #stop, but the streaming connection is closed immediately
during teardown (rather than after workers drain), so the server
re-dispatches any in-flight jobs after its visibility timeout. Use
this when #stop has been given adequate time and still hasn't
returned.
In-progress jobs on worker threads continue to completion — we don't interrupt user code mid-execution — but no new jobs are pulled from the queue and cleanup uses short deadlines.
Safe to call from a signal handler.
144 145 146 147 148 |
# File 'lib/zizq/worker.rb', line 144 def kill #: () -> void @killing = true @lifecycle.drain! @dispatch_queue.close rescue nil end |
#push_ack(job_id) ⇒ void
This method returns an undefined value.
450 451 452 |
# File 'lib/zizq/worker.rb', line 450 def push_ack(job_id) @ack_processor.push(AckProcessor::Ack.new(job_id:)) end |
#push_nack(job_id, error) ⇒ void
This method returns an undefined value.
457 458 459 460 461 462 463 464 |
# File 'lib/zizq/worker.rb', line 457 def push_nack(job_id, error) @ack_processor.push(AckProcessor::Nack.new( job_id: job_id, message: "#{error.class}: #{error.}", error_type: error.class.name, backtrace: error.backtrace&.join("\n") )) end |
#reset_runtime_state ⇒ Object
Reset all mutable runtime state so #run can be called multiple
times on the same Worker instance. Called from #initialize and
from the top of #run.
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/zizq/worker.rb', line 226 def reset_runtime_state #: () -> void @backoff = Backoff.new( min_wait: @retry_min_wait, max_wait: @retry_max_wait, multiplier: @retry_multiplier, ) @lifecycle = Lifecycle.new @dispatch_queue = Thread::Queue.new @streaming_response = nil #: untyped @killing = false @ack_processor = AckProcessor.new( client: Zizq.client, capacity: @prefetch * 2, logger: @logger, backoff: @backoff, ) end |
#run ⇒ Object
Start the worker.
Spawns the desired number of worker threads and fibers, distributes jobs to those workers and then blocks until shutdown. Safe to call multiple times on the same Worker instance — all mutable runtime state (lifecycle, dispatch queue, ack processor, backoff) is reset at the start of each run.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/zizq/worker.rb', line 157 def run #: () -> void reset_runtime_state logger.info do format( "Zizq worker starting: %d threads, %d fibers, prefetch=%d", thread_count, fiber_count, prefetch, ) end logger.info { "Queues: #{queues.empty? ? '(all)' : queues.join(', ')}" } # Everything runs in the background initially. @ack_processor.start worker_threads = start_worker_threads producer_thread = start_producer_thread # Block until the lifecycle leaves :running (stop, kill, or crash). @lifecycle.wait_while_running if @killing logger.info { "Killing. Closing stream and forcing shutdown..." } # Close the streaming response immediately so the server # re-dispatches any in-flight jobs after its visibility timeout. # This also unblocks the producer's IO read. @streaming_response&.close rescue nil # Workers will finish their current job (can't be interrupted) # and then see the closed dispatch queue and exit. worker_threads.each(&:join) # Drain whatever acks happen to flush before their fibers exit. # No timeout — workers finish their current job and exit quickly. @ack_processor.stop else logger.info { "Shutting down. Waiting for workers to finish..." } # Workers drain remaining jobs from the closed dispatch queue. # The producer stays connected so in-flight jobs aren't requeued # by the server while workers are still finishing them. worker_threads.each(&:join) # Drain pending acks/nacks while the connection is still open. @ack_processor.stop # Close the streaming response to unblock the producer's IO read. # This happens after workers and acks have drained so the server # doesn't requeue in-flight jobs while workers are still finishing. @streaming_response&.close rescue nil end # Signal the producer that cleanup is complete. The watcher fiber # inside the producer's Sync block wakes up on this and cancels # the producer's main task, so the stream is closed from its own # reactor rather than via a cross-thread close. @lifecycle.stop! producer_thread.join logger.info { "Zizq worker stopped" } end |
#run_fiber_workers(thread_idx) ⇒ Object
Fiber-based worker loop. Requires the async gem.
382 383 384 385 386 387 388 389 390 391 392 |
# File 'lib/zizq/worker.rb', line 382 def run_fiber_workers(thread_idx) #: (Integer) -> void require "async" Async do |task| fiber_count.times do |fiber_idx| task.async do run_loop(thread_idx, fiber_idx) end end end end |
#run_loop(thread_idx, fiber_idx) ⇒ Object
Internal worker run loop.
Each worker thread or fiber continually pops jobs from the internal queue and dispatches them to the correct job class until the queue is closed and drained.
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/zizq/worker.rb', line 363 def run_loop(thread_idx, fiber_idx) #: (Integer, Integer) -> void logger.info do format("Worker %d:%d started", thread_idx, fiber_idx) end loop do # pop returns nil when queue is closed and empty job = @dispatch_queue.pop break if job.nil? dispatch(job) end logger.info do format("Worker %d:%d stopped", thread_idx, fiber_idx) end end |
#start_producer_thread ⇒ Object
: () -> Thread
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/zizq/worker.rb', line 244 def start_producer_thread #: () -> Thread Thread.new do Thread.current.name = "zizq-producer" logger.info { "Zizq producer thread started" } # The producer runs inside its own Sync block so we can spawn a # watcher fiber that cancels the main producer task on final # shutdown. `task.stop` raises `Async::Stop` at the next fiber # yield point, which means we can interrupt the producer # wherever it's currently blocked — inside `stream_http.get` # reading response headers, inside `parse_ndjson` waiting on # the body, or inside `wait_until_stopped`. All of those are # fiber yield points, so the cancellation is immediate. # # The watcher waits on `wait_until_stopped` (not # `wait_while_running`) so the producer stays connected through # the worker+ack drain phase. Only once main has finished # cleanup and called `@lifecycle.stop!` does the producer get # cancelled. Sync do |task| task.async do @lifecycle.wait_until_stopped task.stop end while @lifecycle.running? begin client = Zizq.client logger.info { "Connecting to #{client.url}..." } client.take_jobs( prefetch:, queues:, on_connect: -> { logger.info { "Connected. Listening for jobs." } @backoff.reset }, on_response: ->(resp) { @streaming_response = resp }, ) do |job| begin logger.debug do format( "Received %s (%s), dispatch queue: %d", job.type, job.id, @dispatch_queue.size ) end @dispatch_queue.push(job) rescue ClosedQueueError # Shutdown in progress. Stay connected so in-flight jobs # aren't requeued while workers and acks drain. The # watcher fiber will cancel this task when main calls # `@lifecycle.stop!` at the end of cleanup. @lifecycle.wait_until_stopped break end end # Stream ended normally — clear stale reference and reset backoff. @streaming_response = nil @backoff.reset rescue Async::Stop # Watcher fiber cancelled us — shutdown is complete. break rescue Zizq::ConnectionError, Zizq::StreamError => error break unless @lifecycle.running? logger.warn do format( "%s: %s. Reconnecting in %.2fs...", error.class, error., @backoff.duration, ) end @backoff.wait rescue => error break unless @lifecycle.running? logger.error { "Error: #{error.class}: #{error.}" } logger.debug { error.backtrace&.join("\n") } @backoff.wait end end end # Ensure queue is closed so workers can drain and exit @dispatch_queue.close rescue nil logger.info { "Zizq producer thread stopped" } ensure # Wake the main thread if the producer crashes during normal # operation (before a shutdown signal). @lifecycle.drain! end end |
#start_worker_threads ⇒ Object
: () -> Array
344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/zizq/worker.rb', line 344 def start_worker_threads #: () -> Array[Thread] (0...thread_count).map do |thread_idx| Thread.new(thread_idx) do |tidx| Thread.current.name = "zizq-worker-#{tidx}" if fiber_count > 1 run_fiber_workers(tidx) else run_loop(tidx, 0) end end end end |
#stop ⇒ Object
Request a graceful shutdown.
Transitions the lifecycle to :draining and closes the dispatch
queue. Worker threads finish any in-flight jobs, the ack processor
flushes pending acks, and the producer stays connected to the server
while all of that drains — only then is the streaming connection
closed and #run returns.
Safe to call from a signal handler (uses only atomic ivar assignment
and Thread::Queue#close).
126 127 128 129 |
# File 'lib/zizq/worker.rb', line 126 def stop #: () -> void @lifecycle.drain! @dispatch_queue.close rescue nil end |