Class: Zizq::AckProcessor
- Inherits:
-
Object
- Object
- Zizq::AckProcessor
- Defined in:
- lib/zizq/ack_processor.rb
Overview
Dedicated background thread that processes ack/nack HTTP requests on behalf of worker threads, decoupling job processing from network I/O.
Workers push Ack/Nack items to a thread-safe queue. The processor runs an async event loop that spawns an independent fiber per ack/nack request, enabling true concurrent I/O over a single HTTP/2 connection. Each fiber handles its own retries with exponential backoff.
Defined Under Namespace
Instance Method Summary collapse
-
#initialize(client:, capacity:, logger:, backoff:) ⇒ AckProcessor
constructor
A new instance of AckProcessor.
-
#push(item) ⇒ Object
Push an Ack or Nack to the processing queue.
-
#start ⇒ Object
Start the background processor thread.
-
#stop ⇒ Object
Close the queue and wait for the processor to drain.
Constructor Details
#initialize(client:, capacity:, logger:, backoff:) ⇒ AckProcessor
Returns a new instance of AckProcessor.
30 31 32 33 34 35 |
# File 'lib/zizq/ack_processor.rb', line 30 def initialize(client:, capacity:, logger:, backoff:) @client = client @logger = logger @backoff = backoff @queue = Thread::SizedQueue.new(capacity) end |
Instance Method Details
#push(item) ⇒ Object
Push an Ack or Nack to the processing queue. Blocks if the queue is at capacity (backpressure).
42 43 44 |
# File 'lib/zizq/ack_processor.rb', line 42 def push(item) @queue.push(item) end |
#start ⇒ Object
Start the background processor thread.
47 48 49 50 51 |
# File 'lib/zizq/ack_processor.rb', line 47 def start #: () -> Thread @thread = Thread.new { run } @thread.name = "zizq-ack-processor" @thread end |
#stop ⇒ Object
Close the queue and wait for the processor to drain. Waits indefinitely —callers who want a deadline should wrap the call in ‘Timeout::timeout`.
57 58 59 60 |
# File 'lib/zizq/ack_processor.rb', line 57 def stop @queue.close @thread&.join end |