Class: NNQ::Routing::Bus

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/bus.rb

Overview

BUS0: best-effort bidirectional mesh.

Send side: fan-out to all connected peers. Each peer gets its own bounded send queue and pump fiber — a slow peer drops messages instead of blocking fast ones (same as PUB). Send never blocks.

Recv side: all incoming messages are pushed into a shared unbounded queue (same as PULL).

No SP headers in cooked mode — body on the wire is the user payload.

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Bus

Returns a new instance of Bus.



22
23
24
25
26
# File 'lib/nnq/routing/bus.rb', line 22

def initialize(engine)
  @engine     = engine
  @queues     = {} # conn => Async::LimitedQueue
  @recv_queue = Async::Queue.new
end

Instance Method Details

#closeObject



76
77
78
79
# File 'lib/nnq/routing/bus.rb', line 76

def close
  @queues.clear
  @recv_queue.enqueue(nil)
end

#close_readObject



82
83
84
# File 'lib/nnq/routing/bus.rb', line 82

def close_read
  @recv_queue.enqueue(nil)
end

#connection_added(conn) ⇒ Object



59
60
61
62
63
# File 'lib/nnq/routing/bus.rb', line 59

def connection_added(conn)
  queue = Async::LimitedQueue.new(@engine.options.send_hwm)
  @queues[conn] = queue
  spawn_pump(conn, queue)
end

#connection_removed(conn) ⇒ Object



66
67
68
# File 'lib/nnq/routing/bus.rb', line 66

def connection_removed(conn)
  @queues.delete(conn)
end

#direct_recv_for(_conn) ⇒ Object

Inproc fast-path hook: peer pipe enqueues directly into the shared recv queue — identity transform, no backtrace or filter.



48
49
50
# File 'lib/nnq/routing/bus.rb', line 48

def direct_recv_for(_conn)
  [@recv_queue, nil]
end

#enqueue(body, _conn = nil) ⇒ Object

Called by the engine recv loop with each received message.



41
42
43
# File 'lib/nnq/routing/bus.rb', line 41

def enqueue(body, _conn = nil)
  @recv_queue.enqueue(body)
end

#receiveString?

Returns message body, or nil once the socket is closed.

Returns:

  • (String, nil)

    message body, or nil once the socket is closed



54
55
56
# File 'lib/nnq/routing/bus.rb', line 54

def receive
  @recv_queue.dequeue
end

#send(body) ⇒ Object

Broadcasts body to every connected peer. Non-blocking per peer: drops when a peer’s queue is at HWM.

Parameters:

  • body (String)


33
34
35
36
37
# File 'lib/nnq/routing/bus.rb', line 33

def send(body)
  @queues.each_value do |queue|
    queue.enqueue(body) unless queue.limited?
  end
end

#send_queue_drained?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/nnq/routing/bus.rb', line 71

def send_queue_drained?
  @queues.each_value.all? { |q| q.empty? }
end