Class: NNQ::Routing::Pub

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/pub.rb

Overview

PUB side of the pub/sub pattern (nng pub0).

Broadcasts every message to every connected SUB. Each peer gets its own bounded send queue (‘send_hwm`) and its own send pump fiber — a slow subscriber cannot block fast ones. When a peer’s queue is full, new messages are dropped for that peer (matching nng’s non-blocking fan-out semantics).

Pub0 has no subscription state on the sender side: SUBs filter locally. Pub0 is strictly one-directional; nothing is read from SUB peers.

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Pub

Returns a new instance of Pub.



21
22
23
24
# File 'lib/nnq/routing/pub.rb', line 21

def initialize(engine)
  @engine = engine
  @queues = {} # conn => Async::LimitedQueue
end

Instance Method Details

#closeObject



60
61
62
# File 'lib/nnq/routing/pub.rb', line 60

def close
  @queues.clear
end

#connection_added(conn) ⇒ Object



38
39
40
41
42
43
44
45
46
# File 'lib/nnq/routing/pub.rb', line 38

def connection_added(conn)
  queue = Async::LimitedQueue.new(@engine.options.send_hwm)
  # Register queue BEFORE spawning the pump. spawn_task yields
  # control into the new task body, which parks on queue.dequeue;
  # at that park the publisher fiber can run and must already see
  # this peer's queue.
  @queues[conn] = queue
  spawn_pump(conn, queue)
end

#connection_removed(conn) ⇒ Object



49
50
51
# File 'lib/nnq/routing/pub.rb', line 49

def connection_removed(conn)
  @queues.delete(conn)
end

#send(body) ⇒ Object

Broadcasts body to every connected peer. Non-blocking per peer: drops when a peer’s queue is at HWM.

Parameters:

  • body (String)


31
32
33
34
35
# File 'lib/nnq/routing/pub.rb', line 31

def send(body)
  @queues.each_value do |queue|
    queue.enqueue(body) unless queue.limited?
  end
end

#send_queue_drained?Boolean

True once every peer’s queue is empty. Engine linger polls this.

Returns:

  • (Boolean)


55
56
57
# File 'lib/nnq/routing/pub.rb', line 55

def send_queue_drained?
  @queues.each_value.all? { |q| q.empty? }
end