Class: NNQ::Routing::Pub

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/pub.rb

Overview

PUB side of the pub/sub pattern (nng pub0).

Broadcasts every message to every connected SUB. Each peer gets its own bounded send queue (‘send_hwm`) and its own send pump fiber — a slow subscriber cannot block fast ones. When a peer’s queue is full, new messages are dropped for that peer (matching nng’s non-blocking fan-out semantics).

Pub0 has no subscription state on the sender side: SUBs filter locally. Pub0 is strictly one-directional; nothing is read from SUB peers.

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Pub

Returns a new instance of Pub.



21
22
23
24
25
# File 'lib/nnq/routing/pub.rb', line 21

def initialize(engine)
  @engine     = engine
  @queues     = {} # conn => Async::LimitedQueue
  @pump_tasks = {} # conn => Async::Task
end

Instance Method Details

#closeObject



67
68
69
70
71
# File 'lib/nnq/routing/pub.rb', line 67

def close
  @pump_tasks.each_value(&:stop)
  @pump_tasks.clear
  @queues.clear
end

#connection_added(conn) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/nnq/routing/pub.rb', line 39

def connection_added(conn)
  queue = Async::LimitedQueue.new(@engine.options.send_hwm)
  # Register queue BEFORE spawning the pump. spawn_task yields
  # control into the new task body, which parks on queue.dequeue;
  # at that park the publisher fiber can run and must already see
  # this peer's queue.
  @queues[conn]     = queue
  @pump_tasks[conn] = spawn_pump(conn, queue)
end

#connection_removed(conn) ⇒ Object



50
51
52
53
54
55
56
57
58
# File 'lib/nnq/routing/pub.rb', line 50

def connection_removed(conn)
  @queues.delete(conn)
  task = @pump_tasks.delete(conn)
  return unless task
  return if task == Async::Task.current
  task.stop
rescue IOError, Errno::EPIPE
  # pump was mid-flush; already unwinding
end

#send(body) ⇒ Object

Broadcasts body to every connected peer. Non-blocking per peer: drops when a peer’s queue is at HWM.

Parameters:

  • body (String)


32
33
34
35
36
# File 'lib/nnq/routing/pub.rb', line 32

def send(body)
  @queues.each_value do |queue|
    queue.enqueue(body) unless queue.limited?
  end
end

#send_queue_drained?Boolean

True once every peer’s queue is empty. Engine linger polls this.

Returns:

  • (Boolean)


62
63
64
# File 'lib/nnq/routing/pub.rb', line 62

def send_queue_drained?
  @queues.each_value.all? { |q| q.empty? }
end