Class: Zizq::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/zizq/worker.rb,
sig/generated/zizq/worker.rbs

Overview

Top-level worker process which orchestrates fetching jobs from the server and dispatching them to a pool of worker tasks for processing.

Fiber support (when fiber_count > 1) creates an Async context. When fiber_count == 1, no Async context is created.

Total concurrency is calculated as thread_count * fiber_count.

Constant Summary collapse

DEFAULT_THREADS =

: Integer

Returns:

  • (Integer)
1
DEFAULT_FIBERS =

: Integer

Returns:

  • (Integer)
1
DEFAULT_RETRY_MIN_WAIT =

Returns:

  • (::Integer)
1
DEFAULT_RETRY_MAX_WAIT =

Returns:

  • (::Integer)
30
DEFAULT_RETRY_MULTIPLIER =

Returns:

  • (::Integer)
2

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queues: nil, thread_count: nil, fiber_count: nil, prefetch: nil, retry_min_wait: nil, retry_max_wait: nil, retry_multiplier: nil, logger: nil, dispatcher: nil) ⇒ Worker

All keyword arguments default to nil and follow a three-level fallback chain:

1. Explicit kwarg passed to `Worker.new`.
2. `Zizq.configuration.worker.<field>` set in the app's
 `Zizq.configure` block.
3. The Worker's hardcoded `DEFAULT_*` constants.

Parameters:

  • queues: (Array[String], nil) (defaults to: nil)
  • thread_count: (Integer, nil) (defaults to: nil)
  • fiber_count: (Integer, nil) (defaults to: nil)
  • prefetch: (Integer, nil) (defaults to: nil)
  • retry_min_wait: (Float, Integer, nil) (defaults to: nil)
  • retry_max_wait: (Float, Integer, nil) (defaults to: nil)
  • retry_multiplier: (Float, Integer, nil) (defaults to: nil)
  • logger: (Logger, nil) (defaults to: nil)
  • dispatcher: (^(Resources::Job) -> void, nil) (defaults to: nil)


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/zizq/worker.rb', line 82

def initialize(
  queues: nil,
  thread_count: nil,
  fiber_count: nil,
  prefetch: nil,
  retry_min_wait: nil,
  retry_max_wait: nil,
  retry_multiplier: nil,
  logger: nil,
  dispatcher: nil
)
  Zizq.configuration.validate!
  config = Zizq.configuration.worker

  @queues = queues || config.queues || []
  @thread_count = thread_count || config.thread_count || DEFAULT_THREADS
  @fiber_count = fiber_count || config.fiber_count || DEFAULT_FIBERS
  @prefetch = prefetch || config.prefetch || @thread_count * @fiber_count * 2
  @retry_min_wait = retry_min_wait || config.retry_min_wait || DEFAULT_RETRY_MIN_WAIT
  @retry_max_wait = retry_max_wait || config.retry_max_wait || DEFAULT_RETRY_MAX_WAIT
  @retry_multiplier = retry_multiplier || config.retry_multiplier || DEFAULT_RETRY_MULTIPLIER
  @logger = logger || Zizq.configuration.logger
  @dispatcher = dispatcher || Zizq.configuration.dequeue_middleware

  if @thread_count < 1
    raise ArgumentError, "thread_count must be at least 1 (got #{@thread_count})"
  end
  if @fiber_count < 1
    raise ArgumentError, "fiber_count must be at least 1 (got #{@fiber_count})"
  end

  reset_runtime_state
end

Instance Attribute Details

#dispatcher^(Resources::Job) -> void (readonly)

The dispatcher used to handle each job.

Defaults to the globally-configured dequeue_middleware chain. When a custom dispatcher is provided to #initialize, it is used as-is and the configured middleware chain is ignored. Caller may construct their own Zizq::Middleware::Chain if middleware needs to be applied.

Returns:



62
63
64
# File 'lib/zizq/worker.rb', line 62

def dispatcher
  @dispatcher
end

#fiber_countInteger (readonly)

The total number of fibers to run within each worker thread.

For applications that cannot handle multi-fiber execution, this should be set to 1. Any value greater than 1 runs workers inside an Async context (default: 1).

Returns:

  • (Integer)


40
41
42
# File 'lib/zizq/worker.rb', line 40

def fiber_count
  @fiber_count
end

#loggerLogger (readonly)

An instance of a Logger to be used for worker logging.

Returns:

  • (Logger)


54
55
56
# File 'lib/zizq/worker.rb', line 54

def logger
  @logger
end

#prefetchInteger (readonly)

The total number of jobs to allow to be sent from the server at once.

Defaults to 2x the total concurrency (threads * fibers) to keep the pipeline full while ack round-trips are in flight.

Returns:

  • (Integer)


51
52
53
# File 'lib/zizq/worker.rb', line 51

def prefetch
  @prefetch
end

#queuesArray[String] (readonly)

The set of queues from which to fetch jobs.

An empty set (default) means all queues.

Returns:

  • (Array[String])


45
46
47
# File 'lib/zizq/worker.rb', line 45

def queues
  @queues
end

#thread_countInteger (readonly)

The total number of worker threads to run.

For applications that are not threadsafe, this should be set to 1 (default: 5).

Returns:

  • (Integer)


33
34
35
# File 'lib/zizq/worker.rb', line 33

def thread_count
  @thread_count
end

Class Method Details

.runObject

Convenience class method to create and run a worker.

Returns:

  • (Object)


25
26
27
# File 'lib/zizq/worker.rb', line 25

def self.run(...) #: (**untyped) -> void
  new(...).run
end

Instance Method Details

#dispatch(job) ⇒ Object

Process a single job.

Delegates to the configured dispatcher (default: Zizq::Job.dispatch) and reports success or failure.

Parameters:

  • job (Object)

Returns:

  • (Object)


398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/zizq/worker.rb', line 398

def dispatch(job) #: (Resources::Job) -> void
  job_id, job_type = job.id, job.type

  begin
    start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

    begin
      @dispatcher.call(job)
    ensure
      finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      elapsed_time = finish_time - start_time
    end
  rescue Exception => error
    raise if !@lifecycle.running? && error.is_a?(Async::Stop)

    logger.error do
      format(
        "Job %s (%s) failed in %.4fs: %s: %s",
        job_type,
        job_id,
        elapsed_time,
        error.class,
        error.message
      )
    end

    push_nack(job_id, error)
    return
  end

  push_ack(job_id)

  logger.debug do
    format(
      "Job %s (%s) completed in %.4fs",
      job_type,
      job_id,
      elapsed_time
    )
  end
rescue Async::Stop, ClosedQueueError
  # In the case jobs take too long to terminate, they are force killed
  # which produces errors as they attempt to ack/nack etc.
  #
  # This means those jobs terminate without finishing their work but the
  # Zizq backend automatically returns them to the queue when the client
  # disconnects, so they'll be received by another worker when one connects.
  logger.debug { "Job #{job_type} (#{job_id}) interrupted during shutdown" }
end

#killObject

Request an immediate shutdown.

Like #stop, but the streaming connection is closed immediately during teardown (rather than after workers drain), so the server re-dispatches any in-flight jobs after its visibility timeout. Use this when #stop has been given adequate time and still hasn't returned.

In-progress jobs on worker threads continue to completion — we don't interrupt user code mid-execution — but no new jobs are pulled from the queue and cleanup uses short deadlines.

Safe to call from a signal handler.

Returns:

  • (Object)


144
145
146
147
148
# File 'lib/zizq/worker.rb', line 144

def kill #: () -> void
  @killing = true
  @lifecycle.drain!
  @dispatch_queue.close rescue nil
end

#push_ack(job_id) ⇒ void

This method returns an undefined value.

Parameters:

  • job_id (String)


450
451
452
# File 'lib/zizq/worker.rb', line 450

def push_ack(job_id)
  @ack_processor.push(AckProcessor::Ack.new(job_id:))
end

#push_nack(job_id, error) ⇒ void

This method returns an undefined value.

Parameters:

  • job_id (String)
  • error (Exception)


457
458
459
460
461
462
463
464
# File 'lib/zizq/worker.rb', line 457

def push_nack(job_id, error)
  @ack_processor.push(AckProcessor::Nack.new(
    job_id:     job_id,
    message:    "#{error.class}: #{error.message}",
    error_type: error.class.name,
    backtrace:  error.backtrace&.join("\n")
  ))
end

#reset_runtime_stateObject

Reset all mutable runtime state so #run can be called multiple times on the same Worker instance. Called from #initialize and from the top of #run.

Returns:

  • (Object)


226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/zizq/worker.rb', line 226

def reset_runtime_state #: () -> void
  @backoff = Backoff.new(
    min_wait:   @retry_min_wait,
    max_wait:   @retry_max_wait,
    multiplier: @retry_multiplier,
  )
  @lifecycle = Lifecycle.new
  @dispatch_queue = Thread::Queue.new
  @streaming_response = nil #: untyped
  @killing = false
  @ack_processor = AckProcessor.new(
    client:   Zizq.client,
    capacity: @prefetch * 2,
    logger:   @logger,
    backoff:  @backoff,
  )
end

#runObject

Start the worker.

Spawns the desired number of worker threads and fibers, distributes jobs to those workers and then blocks until shutdown. Safe to call multiple times on the same Worker instance — all mutable runtime state (lifecycle, dispatch queue, ack processor, backoff) is reset at the start of each run.

Returns:

  • (Object)


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/zizq/worker.rb', line 157

def run #: () -> void
  reset_runtime_state

  logger.info do
    format(
      "Zizq worker starting: %d threads, %d fibers, prefetch=%d",
      thread_count,
      fiber_count,
      prefetch,
    )
  end

  logger.info { "Queues: #{queues.empty? ? '(all)' : queues.join(', ')}" }

  # Everything runs in the background initially.
  @ack_processor.start
  worker_threads = start_worker_threads
  producer_thread = start_producer_thread

  # Block until the lifecycle leaves :running (stop, kill, or crash).
  @lifecycle.wait_while_running

  if @killing
    logger.info { "Killing. Closing stream and forcing shutdown..." }

    # Close the streaming response immediately so the server
    # re-dispatches any in-flight jobs after its visibility timeout.
    # This also unblocks the producer's IO read.
    @streaming_response&.close rescue nil

    # Workers will finish their current job (can't be interrupted)
    # and then see the closed dispatch queue and exit.
    worker_threads.each(&:join)

    # Drain whatever acks happen to flush before their fibers exit.
    # No timeout — workers finish their current job and exit quickly.
    @ack_processor.stop
  else
    logger.info { "Shutting down. Waiting for workers to finish..." }

    # Workers drain remaining jobs from the closed dispatch queue.
    # The producer stays connected so in-flight jobs aren't requeued
    # by the server while workers are still finishing them.
    worker_threads.each(&:join)

    # Drain pending acks/nacks while the connection is still open.
    @ack_processor.stop

    # Close the streaming response to unblock the producer's IO read.
    # This happens after workers and acks have drained so the server
    # doesn't requeue in-flight jobs while workers are still finishing.
    @streaming_response&.close rescue nil
  end

  # Signal the producer that cleanup is complete. The watcher fiber
  # inside the producer's Sync block wakes up on this and cancels
  # the producer's main task, so the stream is closed from its own
  # reactor rather than via a cross-thread close.
  @lifecycle.stop!
  producer_thread.join

  logger.info { "Zizq worker stopped" }
end

#run_fiber_workers(thread_idx) ⇒ Object

Fiber-based worker loop. Requires the async gem.

Parameters:

  • thread_idx (Object)

Returns:

  • (Object)


382
383
384
385
386
387
388
389
390
391
392
# File 'lib/zizq/worker.rb', line 382

def run_fiber_workers(thread_idx) #: (Integer) -> void
  require "async"

  Async do |task|
    fiber_count.times do |fiber_idx|
      task.async do
        run_loop(thread_idx, fiber_idx)
      end
    end
  end
end

#run_loop(thread_idx, fiber_idx) ⇒ Object

Internal worker run loop.

Each worker thread or fiber continually pops jobs from the internal queue and dispatches them to the correct job class until the queue is closed and drained.

Parameters:

  • thread_idx (Object)
  • fiber_idx (Object)

Returns:

  • (Object)


363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/zizq/worker.rb', line 363

def run_loop(thread_idx, fiber_idx) #: (Integer, Integer) -> void
  logger.info do
    format("Worker %d:%d started", thread_idx, fiber_idx)
  end

  loop do
    # pop returns nil when queue is closed and empty
    job = @dispatch_queue.pop
    break if job.nil?

    dispatch(job)
  end

  logger.info do
    format("Worker %d:%d stopped", thread_idx, fiber_idx)
  end
end

#start_producer_threadObject

: () -> Thread

Returns:

  • (Object)


244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/zizq/worker.rb', line 244

def start_producer_thread #: () -> Thread
  Thread.new do
    Thread.current.name = "zizq-producer"

    logger.info { "Zizq producer thread started" }

    # The producer runs inside its own Sync block so we can spawn a
    # watcher fiber that cancels the main producer task on final
    # shutdown. `task.stop` raises `Async::Stop` at the next fiber
    # yield point, which means we can interrupt the producer
    # wherever it's currently blocked — inside `stream_http.get`
    # reading response headers, inside `parse_ndjson` waiting on
    # the body, or inside `wait_until_stopped`. All of those are
    # fiber yield points, so the cancellation is immediate.
    #
    # The watcher waits on `wait_until_stopped` (not
    # `wait_while_running`) so the producer stays connected through
    # the worker+ack drain phase. Only once main has finished
    # cleanup and called `@lifecycle.stop!` does the producer get
    # cancelled.
    Sync do |task|
      task.async do
        @lifecycle.wait_until_stopped
        task.stop
      end

      while @lifecycle.running?
        begin
          client = Zizq.client
          logger.info { "Connecting to #{client.url}..." }

          client.take_jobs(
            prefetch:,
            queues:,
            on_connect: -> {
              logger.info { "Connected. Listening for jobs." }
              @backoff.reset
            },
            on_response: ->(resp) { @streaming_response = resp },
          ) do |job|
            begin
              logger.debug do
                format(
                  "Received %s (%s), dispatch queue: %d",
                  job.type,
                  job.id,
                  @dispatch_queue.size
                )
              end

              @dispatch_queue.push(job)
            rescue ClosedQueueError
              # Shutdown in progress. Stay connected so in-flight jobs
              # aren't requeued while workers and acks drain. The
              # watcher fiber will cancel this task when main calls
              # `@lifecycle.stop!` at the end of cleanup.
              @lifecycle.wait_until_stopped
              break
            end
          end

          # Stream ended normally — clear stale reference and reset backoff.
          @streaming_response = nil
          @backoff.reset
        rescue Async::Stop
          # Watcher fiber cancelled us — shutdown is complete.
          break
        rescue Zizq::ConnectionError, Zizq::StreamError => error
          break unless @lifecycle.running?

          logger.warn do
            format(
              "%s: %s. Reconnecting in %.2fs...",
              error.class,
              error.message,
              @backoff.duration,
            )
          end

          @backoff.wait
        rescue => error
          break unless @lifecycle.running?

          logger.error { "Error: #{error.class}: #{error.message}" }
          logger.debug { error.backtrace&.join("\n") }
          @backoff.wait
        end
      end
    end

    # Ensure queue is closed so workers can drain and exit
    @dispatch_queue.close rescue nil
    logger.info { "Zizq producer thread stopped" }
  ensure
    # Wake the main thread if the producer crashes during normal
    # operation (before a shutdown signal).
    @lifecycle.drain!
  end
end

#start_worker_threadsObject

: () -> Array

Returns:

  • (Object)


344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/zizq/worker.rb', line 344

def start_worker_threads #: () -> Array[Thread]
  (0...thread_count).map do |thread_idx|
    Thread.new(thread_idx) do |tidx|
      Thread.current.name = "zizq-worker-#{tidx}"

      if fiber_count > 1
        run_fiber_workers(tidx)
      else
        run_loop(tidx, 0)
      end
    end
  end
end

#stopObject

Request a graceful shutdown.

Transitions the lifecycle to :draining and closes the dispatch queue. Worker threads finish any in-flight jobs, the ack processor flushes pending acks, and the producer stays connected to the server while all of that drains — only then is the streaming connection closed and #run returns.

Safe to call from a signal handler (uses only atomic ivar assignment and Thread::Queue#close).

Returns:

  • (Object)


126
127
128
129
# File 'lib/zizq/worker.rb', line 126

def stop #: () -> void
  @lifecycle.drain!
  @dispatch_queue.close rescue nil
end