Module: NNQ::Routing::SendPump
Overview
Mixin for routing strategies that drain a shared bounded send queue via per-connection work-stealing pumps. Used by PUSH (load-balance across N peers) and PAIR (single peer, but the same pump shape).
See DESIGN.md “Per-socket HWM” for the rationale.
The per-pump batch caps (BATCH_MSG_CAP / BATCH_BYTE_CAP) enforce fairness across the work-stealing pumps. Without them, the first pump that wakes up would drain the entire queue in one non-blocking burst before any other pump got a turn (TCP send buffers absorb bursts without forcing a fiber yield).
Including classes must call #init_send_pump from #initialize and #spawn_send_pump_for from their #connection_added hook.
Constant Summary collapse
- BATCH_MSG_CAP =
Max messages a single pump will drain from the shared queue in one batch before yielding. Bounds the worst-case latency other pumps wait when the queue is under sustained pressure.
256- BATCH_BYTE_CAP =
Max cumulative body bytes in one batch. Keeps a single pump from monopolising the writer mutex on huge payloads.
256 * 1024
Instance Method Summary collapse
-
#close ⇒ Object
Stops all send pump tasks.
-
#remove_send_pump_for(conn) ⇒ Object
Removes a pump and stops its task (unless called from inside the pump itself, in which case the pump is already on its way out via the rescue/ensure path).
-
#send_queue_drained? ⇒ Boolean
True once the shared queue is empty AND no batch is mid-write across any pump.
Instance Method Details
#close ⇒ Object
Stops all send pump tasks. Each pump’s ensure block calls engine.handle_connection_lost → routing.connection_removed which removes its own entry, so iterate over a snapshot.
57 58 59 60 |
# File 'lib/nnq/routing/send_pump.rb', line 57 def close @pumps.values.each(&:stop) @pumps.clear end |
#remove_send_pump_for(conn) ⇒ Object
Removes a pump and stops its task (unless called from inside the pump itself, in which case the pump is already on its way out via the rescue/ensure path).
44 45 46 47 48 49 50 51 |
# File 'lib/nnq/routing/send_pump.rb', line 44 def remove_send_pump_for(conn) task = @pumps.delete(conn) return if task.nil? || task == Async::Task.current task.stop rescue IOError, Errno::EPIPE # Pump was mid-flush when its conn was closed; cancel surfaced # the same IOError. Already handled — pump is gone. end |
#send_queue_drained? ⇒ Boolean
Returns true once the shared queue is empty AND no batch is mid-write across any pump.
36 37 38 |
# File 'lib/nnq/routing/send_pump.rb', line 36 def send_queue_drained? @send_queue.empty? && @in_flight.zero? end |