Class: NNQ::Routing::Push

Inherits:
Object
  • Object
show all
Includes:
SendPump
Defined in:
lib/nnq/routing/push.rb

Overview

PUSH side of the pipeline pattern.

Architecture: ONE shared bounded send queue per socket. Each peer connection gets its own send pump fiber that races to dequeue from the shared queue and write to its peer (work-stealing). A slow peer’s pump just stops pulling (blocked on its own TCP flush); fast peers’ pumps keep draining. Strictly better than per-pipe round-robin for PUSH semantics — load naturally biases to whoever is keeping up.

Constant Summary

Constants included from SendPump

SendPump::BATCH_BYTE_CAP, SendPump::BATCH_MSG_CAP

Instance Method Summary collapse

Methods included from SendPump

#close, #remove_send_pump_for, #send_queue_drained?

Constructor Details

#initialize(engine) ⇒ Push

Returns a new instance of Push.



21
22
23
# File 'lib/nnq/routing/push.rb', line 21

def initialize(engine)
  init_send_pump(engine)
end

Instance Method Details

#connection_added(conn) ⇒ Object



34
35
36
# File 'lib/nnq/routing/push.rb', line 34

def connection_added(conn)
  spawn_send_pump_for(conn)
end

#connection_removed(conn) ⇒ Object



39
40
41
# File 'lib/nnq/routing/push.rb', line 39

def connection_removed(conn)
  remove_send_pump_for(conn)
end

#send(body) ⇒ Object

User-facing send: enqueue onto the shared send queue.

Parameters:

  • body (String)


29
30
31
# File 'lib/nnq/routing/push.rb', line 29

def send(body)
  enqueue_for_send(body)
end