Module: NNQ::Routing::SendPump

Included in:
Pair, Push
Defined in:
lib/nnq/routing/send_pump.rb

Overview

Mixin for routing strategies that drain a shared bounded send queue via per-connection work-stealing pumps. Used by PUSH (load-balance across N peers) and PAIR (single peer, but the same pump shape).

See DESIGN.md “Per-socket HWM” for the rationale.

The per-pump batch caps (BATCH_MSG_CAP / BATCH_BYTE_CAP) enforce fairness across the work-stealing pumps. Without them, the first pump that wakes up would drain the entire queue in one non-blocking burst before any other pump got a turn (TCP send buffers absorb bursts without forcing a fiber yield).

Including classes must call #init_send_pump from #initialize and #spawn_send_pump_for from their #connection_added hook.

Constant Summary collapse

BATCH_MSG_CAP =

Max messages a single pump will drain from the shared queue in one batch before yielding. Bounds the worst-case latency other pumps wait when the queue is under sustained pressure.

256
BATCH_BYTE_CAP =

Max cumulative body bytes in one batch. Keeps a single pump from monopolising the writer mutex on huge payloads.

256 * 1024

Instance Method Summary collapse

Instance Method Details

#closeObject

Stops all send pump tasks. Each pump’s ensure block calls engine.handle_connection_lost → routing.connection_removed which removes its own entry, so iterate over a snapshot.



57
58
59
60
# File 'lib/nnq/routing/send_pump.rb', line 57

def close
  @pumps.values.each(&:stop)
  @pumps.clear
end

#remove_send_pump_for(conn) ⇒ Object

Removes a pump and stops its task (unless called from inside the pump itself, in which case the pump is already on its way out via the rescue/ensure path).



44
45
46
47
48
49
50
51
# File 'lib/nnq/routing/send_pump.rb', line 44

def remove_send_pump_for(conn)
  task = @pumps.delete(conn)
  return if task.nil? || task == Async::Task.current
  task.stop
rescue IOError, Errno::EPIPE
  # Pump was mid-flush when its conn was closed; cancel surfaced
  # the same IOError. Already handled — pump is gone.
end

#send_queue_drained?Boolean

Returns true once the shared queue is empty AND no batch is mid-write across any pump.

Returns:

  • (Boolean)

    true once the shared queue is empty AND no batch is mid-write across any pump.



36
37
38
# File 'lib/nnq/routing/send_pump.rb', line 36

def send_queue_drained?
  @send_queue.empty? && @in_flight.zero?
end