Class: OMQ::Routing::XPub
- Inherits:
-
Object
- Object
- OMQ::Routing::XPub
- Includes:
- FanOut
- Defined in:
- lib/omq/routing/xpub.rb
Overview
XPUB socket routing: like PUB but exposes subscription messages.
Subscription/unsubscription messages from peers are delivered to the application as data frames: x01 + prefix for subscribe, x00 + prefix for unsubscribe.
The recv_queue is a simple bounded queue (not a FairQueue) because messages come from subscription commands, not from peer data pumps.
Constant Summary
Constants included from FanOut
Instance Attribute Summary collapse
- #recv_queue ⇒ Async::LimitedQueue readonly
Attributes included from FanOut
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#dequeue_recv ⇒ Object
Engine-facing recv contract.
- #enqueue(parts) ⇒ Object
-
#initialize(engine) ⇒ XPub
constructor
A new instance of XPub.
-
#stop ⇒ void
Stops all background tasks.
- #unblock_recv ⇒ Object
Methods included from FanOut
Constructor Details
#initialize(engine) ⇒ XPub
Returns a new instance of XPub.
24 25 26 27 28 29 |
# File 'lib/omq/routing/xpub.rb', line 24 def initialize(engine) @engine = engine @recv_queue = Routing.build_queue(engine..recv_hwm, :block) @tasks = [] init_fan_out(engine) end |
Instance Attribute Details
#recv_queue ⇒ Async::LimitedQueue (readonly)
19 20 21 |
# File 'lib/omq/routing/xpub.rb', line 19 def recv_queue @recv_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
46 47 48 49 50 51 |
# File 'lib/omq/routing/xpub.rb', line 46 def connection_added(connection) @connections << connection @subscriptions[connection] = Set.new start_subscription_listener(connection) add_fan_out_send_connection(connection) end |
#connection_removed(connection) ⇒ Object
56 57 58 59 60 |
# File 'lib/omq/routing/xpub.rb', line 56 def connection_removed(connection) @connections.delete(connection) @subscriptions.delete(connection) remove_fan_out_send_connection(connection) end |
#dequeue_recv ⇒ Object
Engine-facing recv contract. Delegates to the bounded queue.
34 35 36 |
# File 'lib/omq/routing/xpub.rb', line 34 def dequeue_recv @recv_queue.dequeue end |
#enqueue(parts) ⇒ Object
65 66 67 |
# File 'lib/omq/routing/xpub.rb', line 65 def enqueue(parts) fan_out_enqueue(parts) end |
#stop ⇒ void
This method returns an undefined value.
Stops all background tasks.
74 75 76 77 |
# File 'lib/omq/routing/xpub.rb', line 74 def stop @tasks.each(&:stop) @tasks.clear end |
#unblock_recv ⇒ Object
39 40 41 |
# File 'lib/omq/routing/xpub.rb', line 39 def unblock_recv @recv_queue.push(nil) end |