Class: NNQ::Routing::Bus
- Inherits:
-
Object
- Object
- NNQ::Routing::Bus
- Defined in:
- lib/nnq/routing/bus.rb
Overview
BUS0: best-effort bidirectional mesh.
Send side: fan-out to all connected peers. Each peer gets its own bounded send queue and pump fiber — a slow peer drops messages instead of blocking fast ones (same as PUB). Send never blocks.
Recv side: all incoming messages are pushed into a shared unbounded queue (same as PULL).
No SP headers in cooked mode — body on the wire is the user payload.
Instance Method Summary collapse
- #close ⇒ Object
- #close_read ⇒ Object
- #connection_added(conn) ⇒ Object
- #connection_removed(conn) ⇒ Object
-
#direct_recv_for(_conn) ⇒ Object
Inproc fast-path hook: peer pipe enqueues directly into the shared recv queue — identity transform, no backtrace or filter.
-
#enqueue(body, _conn = nil) ⇒ Object
Called by the engine recv loop with each received message.
-
#initialize(engine) ⇒ Bus
constructor
A new instance of Bus.
-
#receive ⇒ String?
Message body, or nil once the socket is closed.
-
#send(body) ⇒ Object
Broadcasts
bodyto every connected peer. - #send_queue_drained? ⇒ Boolean
Constructor Details
#initialize(engine) ⇒ Bus
Returns a new instance of Bus.
22 23 24 25 26 |
# File 'lib/nnq/routing/bus.rb', line 22 def initialize(engine) @engine = engine @queues = {} # conn => Async::LimitedQueue @recv_queue = Async::Queue.new end |
Instance Method Details
#close ⇒ Object
76 77 78 79 |
# File 'lib/nnq/routing/bus.rb', line 76 def close @queues.clear @recv_queue.enqueue(nil) end |
#close_read ⇒ Object
82 83 84 |
# File 'lib/nnq/routing/bus.rb', line 82 def close_read @recv_queue.enqueue(nil) end |
#connection_added(conn) ⇒ Object
59 60 61 62 63 |
# File 'lib/nnq/routing/bus.rb', line 59 def connection_added(conn) queue = Async::LimitedQueue.new(@engine..send_hwm) @queues[conn] = queue spawn_pump(conn, queue) end |
#connection_removed(conn) ⇒ Object
66 67 68 |
# File 'lib/nnq/routing/bus.rb', line 66 def connection_removed(conn) @queues.delete(conn) end |
#direct_recv_for(_conn) ⇒ Object
Inproc fast-path hook: peer pipe enqueues directly into the shared recv queue — identity transform, no backtrace or filter.
48 49 50 |
# File 'lib/nnq/routing/bus.rb', line 48 def direct_recv_for(_conn) [@recv_queue, nil] end |
#enqueue(body, _conn = nil) ⇒ Object
Called by the engine recv loop with each received message.
41 42 43 |
# File 'lib/nnq/routing/bus.rb', line 41 def enqueue(body, _conn = nil) @recv_queue.enqueue(body) end |
#receive ⇒ String?
Returns message body, or nil once the socket is closed.
54 55 56 |
# File 'lib/nnq/routing/bus.rb', line 54 def receive @recv_queue.dequeue end |
#send(body) ⇒ Object
Broadcasts body to every connected peer. Non-blocking per peer: drops when a peer’s queue is at HWM.
33 34 35 36 37 |
# File 'lib/nnq/routing/bus.rb', line 33 def send(body) @queues.each_value do |queue| queue.enqueue(body) unless queue.limited? end end |
#send_queue_drained? ⇒ Boolean
71 72 73 |
# File 'lib/nnq/routing/bus.rb', line 71 def send_queue_drained? @queues.each_value.all? { |q| q.empty? } end |