Class: Zizq::AckProcessor
- Inherits:
-
Object
- Object
- Zizq::AckProcessor
- Defined in:
- lib/zizq/ack_processor.rb,
sig/generated/zizq/ack_processor.rbs
Overview
Dedicated background thread that processes ack/nack HTTP requests on behalf of worker threads, decoupling job processing from network I/O.
Workers push Ack/Nack items to a thread-safe queue. The processor runs an async event loop that spawns an independent fiber per ack/nack request, enabling true concurrent I/O over a single HTTP/2 connection. Each fiber handles its own retries with exponential backoff.
Defined Under Namespace
Instance Method Summary collapse
-
#initialize(client:, capacity:, logger:, backoff:) ⇒ AckProcessor
constructor
A new instance of AckProcessor.
-
#process_ack_batch(acks) ⇒ Object
: (Array) -> void.
-
#process_nack(nack) ⇒ Object
: (Nack) -> void.
-
#push(item) ⇒ void
Push an Ack or Nack to the processing queue.
-
#run ⇒ Object
: () -> void.
-
#start ⇒ Object
Start the background processor thread.
-
#stop ⇒ void
Close the queue and wait for the processor to drain.
Constructor Details
#initialize(client:, capacity:, logger:, backoff:) ⇒ AckProcessor
Returns a new instance of AckProcessor.
30 31 32 33 34 35 |
# File 'lib/zizq/ack_processor.rb', line 30 def initialize(client:, capacity:, logger:, backoff:) @client = client @logger = logger @backoff = backoff @queue = Thread::SizedQueue.new(capacity) end |
Instance Method Details
#process_ack_batch(acks) ⇒ Object
: (Array) -> void
98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/zizq/ack_processor.rb', line 98 def process_ack_batch(acks) #: (Array[Ack]) -> void backoff = @backoff.fresh ids = acks.map(&:job_id) begin @client.report_success_bulk(ids) rescue ClientError => e @logger.warn { "Bulk ack (#{ids.size} jobs) returned #{e.status} (dropping: #{e.})" } rescue => e @logger.warn { "Retrying bulk ack (#{ids.size} jobs) in #{backoff.duration}s: #{e.}" } backoff.wait retry end end |
#process_nack(nack) ⇒ Object
: (Nack) -> void
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/zizq/ack_processor.rb', line 112 def process_nack(nack) #: (Nack) -> void backoff = @backoff.fresh begin @client.report_failure( nack.job_id, message: nack., error_type: nack.error_type, backtrace: nack.backtrace ) rescue NotFoundError @logger.debug { "Nack for #{nack.job_id} returned 404 (already handled)" } rescue ClientError => e @logger.error { "Nack for #{nack.job_id} returned #{e.status} (dropping)" } rescue => e @logger.warn { "Retrying nack for #{nack.job_id} in #{backoff.duration}s: #{e.}" } backoff.wait retry end end |
#push(item) ⇒ void
This method returns an undefined value.
Push an Ack or Nack to the processing queue. Blocks if the queue is at capacity (backpressure).
42 43 44 |
# File 'lib/zizq/ack_processor.rb', line 42 def push(item) @queue.push(item) end |
#run ⇒ Object
: () -> void
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/zizq/ack_processor.rb', line 64 def run #: () -> void Sync do = Async::Barrier.new while (item = @queue.pop) # Put the item into a batch. batch = [item] # Drain any additional ready items into the batch. loop do batch << @queue.pop(true) # non-blocking rescue ThreadError break end # Partition: acks go bulk, nacks go individually. acks, nacks = batch.partition { |i| i.is_a?(Ack) } unless acks.empty? .async { process_ack_batch(acks) } end nacks.each do |nack| .async { process_nack(nack) } end end .wait end rescue => e @logger.error { "Ack processor crashed: #{e.class}: #{e.}" } @logger.debug { e.backtrace&.join("\n") } end |
#start ⇒ Object
Start the background processor thread.
47 48 49 50 51 |
# File 'lib/zizq/ack_processor.rb', line 47 def start #: () -> Thread @thread = Thread.new { run } @thread.name = "zizq-ack-processor" @thread end |
#stop ⇒ void
This method returns an undefined value.
Close the queue and wait for the processor to drain. Waits indefinitely —
callers who want a deadline should wrap the call in Timeout::timeout.
57 58 59 60 |
# File 'lib/zizq/ack_processor.rb', line 57 def stop @queue.close @thread&.join end |