Class: OMQ::Routing::Pub
- Inherits:
-
Object
- Object
- OMQ::Routing::Pub
- Includes:
- FanOut
- Defined in:
- lib/omq/routing/pub.rb
Overview
PUB socket routing: fan-out to all subscribers.
Listens for SUBSCRIBE/CANCEL commands from peers. Each subscriber gets its own bounded send queue; slow subscribers are muted via the socket’s on_mute strategy (drop by default).
Constant Summary
Constants included from FanOut
Instance Attribute Summary
Attributes included from FanOut
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
- #dequeue_recv ⇒ Object
- #enqueue(parts) ⇒ Object
-
#initialize(engine) ⇒ Pub
constructor
A new instance of Pub.
-
#recv_queue ⇒ Object
PUB is write-only.
- #unblock_recv ⇒ Object
Methods included from FanOut
Constructor Details
#initialize(engine) ⇒ Pub
Returns a new instance of Pub.
16 17 18 19 |
# File 'lib/omq/routing/pub.rb', line 16 def initialize(engine) @engine = engine init_fan_out(engine) end |
Instance Method Details
#connection_added(connection) ⇒ Object
41 42 43 44 45 46 |
# File 'lib/omq/routing/pub.rb', line 41 def connection_added(connection) @connections << connection @subscriptions[connection] = Set.new start_subscription_listener(connection) add_fan_out_send_connection(connection) end |
#connection_removed(connection) ⇒ Object
51 52 53 54 55 |
# File 'lib/omq/routing/pub.rb', line 51 def connection_removed(connection) @connections.delete(connection) @subscriptions.delete(connection) remove_fan_out_send_connection(connection) end |
#dequeue_recv ⇒ Object
30 31 32 |
# File 'lib/omq/routing/pub.rb', line 30 def dequeue_recv raise "PUB sockets cannot receive" end |
#enqueue(parts) ⇒ Object
60 61 62 |
# File 'lib/omq/routing/pub.rb', line 60 def enqueue(parts) fan_out_enqueue(parts) end |
#recv_queue ⇒ Object
PUB is write-only. Engine-facing recv contract: dequeue raises, unblock is a no-op (fatal-error propagation still calls it).
25 26 27 |
# File 'lib/omq/routing/pub.rb', line 25 def recv_queue raise "PUB sockets cannot receive" end |
#unblock_recv ⇒ Object
35 36 |
# File 'lib/omq/routing/pub.rb', line 35 def unblock_recv end |