Class: OMQ::Routing::Pub

Inherits:
Object
  • Object
show all
Includes:
FanOut
Defined in:
lib/omq/routing/pub.rb

Overview

PUB socket routing: fan-out to all subscribers.

Listens for SUBSCRIBE/CANCEL commands from peers. Each subscriber gets its own bounded send queue; slow subscribers are muted via the socket’s on_mute strategy (drop by default).

Constant Summary

Constants included from FanOut

FanOut::EMPTY_BINARY

Instance Attribute Summary

Attributes included from FanOut

#subscriber_joined

Instance Method Summary collapse

Methods included from FanOut

#send_queues_drained?

Constructor Details

#initialize(engine) ⇒ Pub

Returns a new instance of Pub.

Parameters:



16
17
18
19
20
# File 'lib/omq/routing/pub.rb', line 16

def initialize(engine)
  @engine = engine
  @tasks  = []
  init_fan_out(engine)
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


42
43
44
45
46
47
# File 'lib/omq/routing/pub.rb', line 42

def connection_added(connection)
  @connections << connection
  @subscriptions[connection] = Set.new
  start_subscription_listener(connection)
  add_fan_out_send_connection(connection)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


52
53
54
55
56
# File 'lib/omq/routing/pub.rb', line 52

def connection_removed(connection)
  @connections.delete(connection)
  @subscriptions.delete(connection)
  remove_fan_out_send_connection(connection)
end

#dequeue_recvObject



31
32
33
# File 'lib/omq/routing/pub.rb', line 31

def dequeue_recv
  raise "PUB sockets cannot receive"
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)


61
62
63
# File 'lib/omq/routing/pub.rb', line 61

def enqueue(parts)
  fan_out_enqueue(parts)
end

#recv_queueObject

PUB is write-only. Engine-facing recv contract: dequeue raises, unblock is a no-op (fatal-error propagation still calls it).



26
27
28
# File 'lib/omq/routing/pub.rb', line 26

def recv_queue
  raise "PUB sockets cannot receive"
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



70
71
72
73
# File 'lib/omq/routing/pub.rb', line 70

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvObject



36
37
# File 'lib/omq/routing/pub.rb', line 36

def unblock_recv
end