Class: Zizq::AckProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/zizq/ack_processor.rb

Overview

Dedicated background thread that processes ack/nack HTTP requests on behalf of worker threads, decoupling job processing from network I/O.

Workers push Ack/Nack items to a thread-safe queue. The processor runs an async event loop that spawns an independent fiber per ack/nack request, enabling true concurrent I/O over a single HTTP/2 connection. Each fiber handles its own retries with exponential backoff.

Defined Under Namespace

Classes: Ack, Nack

Instance Method Summary collapse

Constructor Details

#initialize(client:, capacity:, logger:, backoff:) ⇒ AckProcessor

Returns a new instance of AckProcessor.



30
31
32
33
34
35
# File 'lib/zizq/ack_processor.rb', line 30

def initialize(client:, capacity:, logger:, backoff:)
  @client = client
  @logger = logger
  @backoff = backoff
  @queue = Thread::SizedQueue.new(capacity)
end

Instance Method Details

#push(item) ⇒ Object

Push an Ack or Nack to the processing queue. Blocks if the queue is at capacity (backpressure).



42
43
44
# File 'lib/zizq/ack_processor.rb', line 42

def push(item)
  @queue.push(item)
end

#startObject

Start the background processor thread.



47
48
49
50
51
# File 'lib/zizq/ack_processor.rb', line 47

def start #: () -> Thread
  @thread = Thread.new { run }
  @thread.name = "zizq-ack-processor"
  @thread
end

#stopObject

Close the queue and wait for the processor to drain. Waits indefinitely —callers who want a deadline should wrap the call in ‘Timeout::timeout`.



57
58
59
60
# File 'lib/zizq/ack_processor.rb', line 57

def stop
  @queue.close
  @thread&.join
end