Class: OMQ::Routing::Pub
- Inherits:
-
Object
- Object
- OMQ::Routing::Pub
- Includes:
- FanOut
- Defined in:
- lib/omq/routing/pub.rb
Overview
PUB socket routing: fan-out to all subscribers.
Listens for SUBSCRIBE/CANCEL commands from peers. Each subscriber gets its own bounded send queue; slow subscribers are muted via the socket’s on_mute strategy (drop by default).
Constant Summary
Constants included from FanOut
Instance Attribute Summary
Attributes included from FanOut
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
- #dequeue_recv ⇒ Object
- #enqueue(parts) ⇒ Object
-
#initialize(engine) ⇒ Pub
constructor
A new instance of Pub.
-
#recv_queue ⇒ Object
PUB is write-only.
-
#stop ⇒ void
Stops all background tasks.
- #unblock_recv ⇒ Object
Methods included from FanOut
Constructor Details
#initialize(engine) ⇒ Pub
Returns a new instance of Pub.
16 17 18 19 20 |
# File 'lib/omq/routing/pub.rb', line 16 def initialize(engine) @engine = engine @tasks = [] init_fan_out(engine) end |
Instance Method Details
#connection_added(connection) ⇒ Object
42 43 44 45 46 47 |
# File 'lib/omq/routing/pub.rb', line 42 def connection_added(connection) @connections << connection @subscriptions[connection] = Set.new start_subscription_listener(connection) add_fan_out_send_connection(connection) end |
#connection_removed(connection) ⇒ Object
52 53 54 55 56 |
# File 'lib/omq/routing/pub.rb', line 52 def connection_removed(connection) @connections.delete(connection) @subscriptions.delete(connection) remove_fan_out_send_connection(connection) end |
#dequeue_recv ⇒ Object
31 32 33 |
# File 'lib/omq/routing/pub.rb', line 31 def dequeue_recv raise "PUB sockets cannot receive" end |
#enqueue(parts) ⇒ Object
61 62 63 |
# File 'lib/omq/routing/pub.rb', line 61 def enqueue(parts) fan_out_enqueue(parts) end |
#recv_queue ⇒ Object
PUB is write-only. Engine-facing recv contract: dequeue raises, unblock is a no-op (fatal-error propagation still calls it).
26 27 28 |
# File 'lib/omq/routing/pub.rb', line 26 def recv_queue raise "PUB sockets cannot receive" end |
#stop ⇒ void
This method returns an undefined value.
Stops all background tasks.
70 71 72 73 |
# File 'lib/omq/routing/pub.rb', line 70 def stop @tasks.each(&:stop) @tasks.clear end |
#unblock_recv ⇒ Object
36 37 |
# File 'lib/omq/routing/pub.rb', line 36 def unblock_recv end |