Class: Zizq::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/zizq/worker.rb

Overview

Top-level worker process which orchestrates fetching jobs from the server and dispatching them to a pool of worker tasks for processing.

Fiber support (when ‘fiber_count > 1`) creates an Async context. When `fiber_count == 1`, no Async context is created.

Total concurrency is calculated as ‘thread_count * fiber_count`.

Constant Summary collapse

DEFAULT_THREADS =

: Integer

5
DEFAULT_FIBERS =

: Integer

1
DEFAULT_RETRY_MIN_WAIT =
1
DEFAULT_RETRY_MAX_WAIT =
30
DEFAULT_RETRY_MULTIPLIER =
2

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queues: [], thread_count: DEFAULT_THREADS, fiber_count: DEFAULT_FIBERS, prefetch: nil, retry_min_wait: DEFAULT_RETRY_MIN_WAIT, retry_max_wait: DEFAULT_RETRY_MAX_WAIT, retry_multiplier: DEFAULT_RETRY_MULTIPLIER, worker_id: nil, logger: nil, dispatcher: nil) ⇒ Worker

Returns a new instance of Worker.

Raises:

  • (ArgumentError)


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/zizq/worker.rb', line 80

def initialize(
  queues: [],
  thread_count: DEFAULT_THREADS,
  fiber_count: DEFAULT_FIBERS,
  prefetch: nil,
  retry_min_wait: DEFAULT_RETRY_MIN_WAIT,
  retry_max_wait: DEFAULT_RETRY_MAX_WAIT,
  retry_multiplier: DEFAULT_RETRY_MULTIPLIER,
  worker_id: nil,
  logger: nil,
  dispatcher: nil
)
  raise ArgumentError, "thread_count must be at least 1 (got #{thread_count})" if thread_count < 1
  raise ArgumentError, "fiber_count must be at least 1 (got #{fiber_count})" if fiber_count < 1

  Zizq.configuration.validate!

  @queues = queues
  @thread_count = thread_count
  @fiber_count = fiber_count
  @prefetch = prefetch || thread_count * fiber_count * 2
  @retry_min_wait = retry_min_wait
  @retry_max_wait = retry_max_wait
  @retry_multiplier = retry_multiplier
  @worker_id_proc = worker_id
  @logger = logger || Zizq.configuration.logger
  @dispatcher = dispatcher || Zizq.configuration.dequeue_middleware

  reset_runtime_state
end

Instance Attribute Details

#dispatcherObject (readonly)

The dispatcher used to handle each job.

Defaults to the globally-configured ‘dequeue_middleware` chain. When a custom dispatcher is provided to `#initialize`, it is used as-is and the configured middleware chain is ignored. Caller may construct their own `Zizq::Middleware::Chain` if middleware needs to be applied.



67
68
69
# File 'lib/zizq/worker.rb', line 67

def dispatcher
  @dispatcher
end

#fiber_countObject (readonly)

The total number of fibers to run within each worker thread.

For applications that cannot handle multi-fiber execution, this should be set to 1. Any value greater than 1 runs workers inside an Async context (default: 1).



40
41
42
# File 'lib/zizq/worker.rb', line 40

def fiber_count
  @fiber_count
end

#loggerObject (readonly)

An instance of a Logger to be used for worker logging.



59
60
61
# File 'lib/zizq/worker.rb', line 59

def logger
  @logger
end

#prefetchObject (readonly)

The total number of jobs to allow to be sent from the server at once.

Defaults to 2x the total concurrency (threads * fibers) to keep the pipeline full while ack round-trips are in flight.



51
52
53
# File 'lib/zizq/worker.rb', line 51

def prefetch
  @prefetch
end

#queuesObject (readonly)

The set of queues from which to fetch jobs.

An empty set (default) means all queues.



45
46
47
# File 'lib/zizq/worker.rb', line 45

def queues
  @queues
end

#thread_countObject (readonly)

The total number of worker threads to run.

For applications that are not threadsafe, this should be set to 1 (default: 5).



33
34
35
# File 'lib/zizq/worker.rb', line 33

def thread_count
  @thread_count
end

#worker_id_procObject (readonly)

Proc to derive a worker ID string for each thread and fiber.

When not present, the Zizq server assigns a random worker ID.



56
57
58
# File 'lib/zizq/worker.rb', line 56

def worker_id_proc
  @worker_id_proc
end

Class Method Details

.runObject

Convenience class method to create and run a worker.



25
26
27
# File 'lib/zizq/worker.rb', line 25

def self.run(...) #: (**untyped) -> void
  new(...).run
end

Instance Method Details

#killObject

Request an immediate shutdown.

Like ‘#stop`, but the streaming connection is closed immediately during teardown (rather than after workers drain), so the server re-dispatches any in-flight jobs after its visibility timeout. Use this when `#stop` has been given adequate time and still hasn’t returned.

In-progress jobs on worker threads continue to completion — we don’t interrupt user code mid-execution — but no new jobs are pulled from the queue and cleanup uses short deadlines.

Safe to call from a signal handler.



139
140
141
142
143
# File 'lib/zizq/worker.rb', line 139

def kill #: () -> void
  @killing = true
  @lifecycle.drain!
  @dispatch_queue.close rescue nil
end

#runObject

Start the worker.

Spawns the desired number of worker threads and fibers, distributes jobs to those workers and then blocks until shutdown. Safe to call multiple times on the same Worker instance — all mutable runtime state (lifecycle, dispatch queue, ack processor, backoff) is reset at the start of each run.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/zizq/worker.rb', line 152

def run #: () -> void
  reset_runtime_state

  logger.info do
    format(
      "Zizq worker starting: %d threads, %d fibers, prefetch=%d",
      thread_count,
      fiber_count,
      prefetch,
    )
  end

  logger.info { "Queues: #{queues.empty? ? '(all)' : queues.join(', ')}" }

  # Everything runs in the background initially.
  @ack_processor.start
  worker_threads = start_worker_threads
  producer_thread = start_producer_thread

  # Block until the lifecycle leaves :running (stop, kill, or crash).
  @lifecycle.wait_while_running

  if @killing
    logger.info { "Killing. Closing stream and forcing shutdown..." }

    # Close the streaming response immediately so the server
    # re-dispatches any in-flight jobs after its visibility timeout.
    # This also unblocks the producer's IO read.
    @streaming_response&.close rescue nil

    # Workers will finish their current job (can't be interrupted)
    # and then see the closed dispatch queue and exit.
    worker_threads.each(&:join)

    # Drain whatever acks happen to flush before their fibers exit.
    # No timeout — workers finish their current job and exit quickly.
    @ack_processor.stop
  else
    logger.info { "Shutting down. Waiting for workers to finish..." }

    # Workers drain remaining jobs from the closed dispatch queue.
    # The producer stays connected so in-flight jobs aren't requeued
    # by the server while workers are still finishing them.
    worker_threads.each(&:join)

    # Drain pending acks/nacks while the connection is still open.
    @ack_processor.stop

    # Close the streaming response to unblock the producer's IO read.
    # This happens after workers and acks have drained so the server
    # doesn't requeue in-flight jobs while workers are still finishing.
    @streaming_response&.close rescue nil
  end

  # Signal the producer that cleanup is complete. The watcher fiber
  # inside the producer's Sync block wakes up on this and cancels
  # the producer's main task, so the stream is closed from its own
  # reactor rather than via a cross-thread close.
  @lifecycle.stop!
  producer_thread.join

  logger.info { "Zizq worker stopped" }
end

#stopObject

Request a graceful shutdown.

Transitions the lifecycle to ‘:draining` and closes the dispatch queue. Worker threads finish any in-flight jobs, the ack processor flushes pending acks, and the producer stays connected to the server while all of that drains — only then is the streaming connection closed and `#run` returns.

Safe to call from a signal handler (uses only atomic ivar assignment and ‘Thread::Queue#close`).



121
122
123
124
# File 'lib/zizq/worker.rb', line 121

def stop #: () -> void
  @lifecycle.drain!
  @dispatch_queue.close rescue nil
end