Class: Zizq::AckProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/zizq/ack_processor.rb,
sig/generated/zizq/ack_processor.rbs

Overview

Dedicated background thread that processes ack/nack HTTP requests on behalf of worker threads, decoupling job processing from network I/O.

Workers push Ack/Nack items to a thread-safe queue. The processor runs an async event loop that spawns an independent fiber per ack/nack request, enabling true concurrent I/O over a single HTTP/2 connection. Each fiber handles its own retries with exponential backoff.

Defined Under Namespace

Classes: Ack, Nack

Instance Method Summary collapse

Constructor Details

#initialize(client:, capacity:, logger:, backoff:) ⇒ AckProcessor

Returns a new instance of AckProcessor.

Parameters:

  • client: (Client)
  • capacity: (Integer)
  • logger: (Logger)
  • backoff: (Backoff)


30
31
32
33
34
35
# File 'lib/zizq/ack_processor.rb', line 30

def initialize(client:, capacity:, logger:, backoff:)
  @client = client
  @logger = logger
  @backoff = backoff
  @queue = Thread::SizedQueue.new(capacity)
end

Instance Method Details

#process_ack_batch(acks) ⇒ Object

: (Array) -> void

Parameters:

  • acks (Object)

Returns:

  • (Object)


98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/zizq/ack_processor.rb', line 98

def process_ack_batch(acks) #: (Array[Ack]) -> void
  backoff = @backoff.fresh
  ids = acks.map(&:job_id)
  begin
    @client.report_success_bulk(ids)
  rescue ClientError => e
    @logger.warn { "Bulk ack (#{ids.size} jobs) returned #{e.status} (dropping: #{e.message})" }
  rescue => e
    @logger.warn { "Retrying bulk ack (#{ids.size} jobs) in #{backoff.duration}s: #{e.message}" }
    backoff.wait
    retry
  end
end

#process_nack(nack) ⇒ Object

: (Nack) -> void

Parameters:

  • nack (Object)

Returns:

  • (Object)


112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/zizq/ack_processor.rb', line 112

def process_nack(nack) #: (Nack) -> void
  backoff = @backoff.fresh
  begin
    @client.report_failure(
      nack.job_id,
      message: nack.message,
      error_type: nack.error_type,
      backtrace: nack.backtrace
    )
  rescue NotFoundError
    @logger.debug { "Nack for #{nack.job_id} returned 404 (already handled)" }
  rescue ClientError => e
    @logger.error { "Nack for #{nack.job_id} returned #{e.status} (dropping)" }
  rescue => e
    @logger.warn { "Retrying nack for #{nack.job_id} in #{backoff.duration}s: #{e.message}" }
    backoff.wait
    retry
  end
end

#push(item) ⇒ void

This method returns an undefined value.

Push an Ack or Nack to the processing queue. Blocks if the queue is at capacity (backpressure).

Parameters:



42
43
44
# File 'lib/zizq/ack_processor.rb', line 42

def push(item)
  @queue.push(item)
end

#runObject

: () -> void

Returns:

  • (Object)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/zizq/ack_processor.rb', line 64

def run #: () -> void
  Sync do
    barrier = Async::Barrier.new

    while (item = @queue.pop)
      # Put the item into a batch.
      batch = [item]

      # Drain any additional ready items into the batch.
      loop do
        batch << @queue.pop(true) # non-blocking
      rescue ThreadError
        break
      end

      # Partition: acks go bulk, nacks go individually.
      acks, nacks = batch.partition { |i| i.is_a?(Ack) }

      unless acks.empty?
        barrier.async { process_ack_batch(acks) }
      end

      nacks.each do |nack|
        barrier.async { process_nack(nack) }
      end
    end

    barrier.wait
  end
rescue => e
  @logger.error { "Ack processor crashed: #{e.class}: #{e.message}" }
  @logger.debug { e.backtrace&.join("\n") }
end

#startObject

Start the background processor thread.

Returns:

  • (Object)


47
48
49
50
51
# File 'lib/zizq/ack_processor.rb', line 47

def start #: () -> Thread
  @thread = Thread.new { run }
  @thread.name = "zizq-ack-processor"
  @thread
end

#stopvoid

This method returns an undefined value.

Close the queue and wait for the processor to drain. Waits indefinitely — callers who want a deadline should wrap the call in Timeout::timeout.



57
58
59
60
# File 'lib/zizq/ack_processor.rb', line 57

def stop
  @queue.close
  @thread&.join
end