Class: OMQ::Routing::Sub
- Inherits:
-
Object
- Object
- OMQ::Routing::Sub
- Defined in:
- lib/omq/routing/sub.rb
Overview
SUB socket routing: subscription-based receive from PUB peers.
Sends SUBSCRIBE/CANCEL commands to connected PUB peers.
Instance Attribute Summary collapse
- #recv_queue ⇒ FairQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#dequeue_recv ⇒ Object
Engine-facing recv contract.
-
#enqueue(_parts) ⇒ Object
SUB is read-only.
-
#initialize(engine) ⇒ Sub
constructor
A new instance of Sub.
-
#stop ⇒ void
Stops all background tasks.
-
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix.
- #unblock_recv ⇒ Object
-
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix.
Constructor Details
Instance Attribute Details
#recv_queue ⇒ FairQueue (readonly)
13 14 15 |
# File 'lib/omq/routing/sub.rb', line 13 def recv_queue @recv_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/omq/routing/sub.rb', line 41 def connection_added(connection) @connections << connection @subscriptions.each do |prefix| connection.send_command(Protocol::ZMTP::Codec::Command.subscribe(prefix)) end conn_q = Routing.build_queue(@engine..recv_hwm, @engine..on_mute) signaling = SignalingQueue.new(conn_q, @recv_queue) @recv_queue.add_queue(connection, conn_q) task = @engine.start_recv_pump(connection, signaling) @tasks << task if task end |
#connection_removed(connection) ⇒ Object
56 57 58 59 |
# File 'lib/omq/routing/sub.rb', line 56 def connection_removed(connection) @connections.delete(connection) @recv_queue.remove_queue(connection) end |
#dequeue_recv ⇒ Object
Engine-facing recv contract. Delegates to the FairQueue.
29 30 31 |
# File 'lib/omq/routing/sub.rb', line 29 def dequeue_recv @recv_queue.dequeue end |
#enqueue(_parts) ⇒ Object
SUB is read-only.
64 65 66 |
# File 'lib/omq/routing/sub.rb', line 64 def enqueue(_parts) raise "SUB sockets cannot send" end |
#stop ⇒ void
This method returns an undefined value.
Stops all background tasks.
97 98 99 100 |
# File 'lib/omq/routing/sub.rb', line 97 def stop @tasks.each(&:stop) @tasks.clear end |
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix.
73 74 75 76 77 78 |
# File 'lib/omq/routing/sub.rb', line 73 def subscribe(prefix) @subscriptions << prefix @connections.each do |conn| conn.send_command(Protocol::ZMTP::Codec::Command.subscribe(prefix)) end end |
#unblock_recv ⇒ Object
34 35 36 |
# File 'lib/omq/routing/sub.rb', line 34 def unblock_recv @recv_queue.push(nil) end |
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix.
85 86 87 88 89 90 |
# File 'lib/omq/routing/sub.rb', line 85 def unsubscribe(prefix) @subscriptions.delete(prefix) @connections.each do |conn| conn.send_command(Protocol::ZMTP::Codec::Command.cancel(prefix)) end end |