Class: NNQ::Routing::Bus

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/bus.rb

Overview

BUS0: best-effort bidirectional mesh.

Send side: fan-out to all connected peers. Each peer gets its own bounded send queue and pump fiber — a slow peer drops messages instead of blocking fast ones (same as PUB). Send never blocks.

Recv side: all incoming messages are pushed into a shared unbounded queue (same as PULL).

No SP headers in cooked mode — body on the wire is the user payload.

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Bus

Returns a new instance of Bus.



22
23
24
25
26
27
# File 'lib/nnq/routing/bus.rb', line 22

def initialize(engine)
  @engine     = engine
  @queues     = {} # conn => Async::LimitedQueue
  @pump_tasks = {} # conn => Async::Task
  @recv_queue = Async::Queue.new
end

Instance Method Details

#closeObject



75
76
77
78
79
80
# File 'lib/nnq/routing/bus.rb', line 75

def close
  @pump_tasks.each_value(&:stop)
  @pump_tasks.clear
  @queues.clear
  @recv_queue.enqueue(nil)
end

#close_readObject



83
84
85
# File 'lib/nnq/routing/bus.rb', line 83

def close_read
  @recv_queue.enqueue(nil)
end

#connection_added(conn) ⇒ Object



53
54
55
56
57
# File 'lib/nnq/routing/bus.rb', line 53

def connection_added(conn)
  queue = Async::LimitedQueue.new(@engine.options.send_hwm)
  @queues[conn]     = queue
  @pump_tasks[conn] = spawn_pump(conn, queue)
end

#connection_removed(conn) ⇒ Object



60
61
62
63
64
65
66
67
# File 'lib/nnq/routing/bus.rb', line 60

def connection_removed(conn)
  @queues.delete(conn)
  task = @pump_tasks.delete(conn)
  return unless task
  return if task == Async::Task.current
  task.stop
rescue IOError, Errno::EPIPE
end

#enqueue(body, _conn = nil) ⇒ Object

Called by the engine recv loop with each received message.



42
43
44
# File 'lib/nnq/routing/bus.rb', line 42

def enqueue(body, _conn = nil)
  @recv_queue.enqueue(body)
end

#receiveString?

Returns message body, or nil once the socket is closed.

Returns:

  • (String, nil)

    message body, or nil once the socket is closed



48
49
50
# File 'lib/nnq/routing/bus.rb', line 48

def receive
  @recv_queue.dequeue
end

#send(body) ⇒ Object

Broadcasts body to every connected peer. Non-blocking per peer: drops when a peer’s queue is at HWM.

Parameters:

  • body (String)


34
35
36
37
38
# File 'lib/nnq/routing/bus.rb', line 34

def send(body)
  @queues.each_value do |queue|
    queue.enqueue(body) unless queue.limited?
  end
end

#send_queue_drained?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/nnq/routing/bus.rb', line 70

def send_queue_drained?
  @queues.each_value.all? { |q| q.empty? }
end