Class: OMQ::Routing::Sub
- Inherits:
-
Object
- Object
- OMQ::Routing::Sub
- Defined in:
- lib/omq/routing/sub.rb
Overview
SUB socket routing: subscription-based receive from PUB peers.
Sends SUBSCRIBE/CANCEL commands to connected PUB peers.
Instance Attribute Summary collapse
- #recv_queue ⇒ Async::LimitedQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message.
-
#enqueue(_parts) ⇒ Object
SUB is read-only.
-
#initialize(engine) ⇒ Sub
constructor
A new instance of Sub.
-
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix.
-
#unblock_recv ⇒ void
Wakes a blocked #dequeue_recv with a nil sentinel.
-
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix.
Constructor Details
#initialize(engine) ⇒ Sub
Returns a new instance of Sub.
18 19 20 21 22 23 |
# File 'lib/omq/routing/sub.rb', line 18 def initialize(engine) @engine = engine @connections = Set.new @recv_queue = Routing.build_queue(engine..recv_hwm, :block) @subscriptions = Set.new end |
Instance Attribute Details
#recv_queue ⇒ Async::LimitedQueue (readonly)
13 14 15 |
# File 'lib/omq/routing/sub.rb', line 13 def recv_queue @recv_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/omq/routing/sub.rb', line 46 def connection_added(connection) @connections << connection @subscriptions.each do |prefix| send_subscribe(connection, prefix) end @engine.start_recv_pump(connection, @recv_queue) end |
#connection_removed(connection) ⇒ Object
59 60 61 |
# File 'lib/omq/routing/sub.rb', line 59 def connection_removed(connection) @connections.delete(connection) end |
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message. Blocks until one is available.
30 31 32 |
# File 'lib/omq/routing/sub.rb', line 30 def dequeue_recv @recv_queue.dequeue end |
#enqueue(_parts) ⇒ Object
SUB is read-only.
66 67 68 |
# File 'lib/omq/routing/sub.rb', line 66 def enqueue(_parts) raise "SUB sockets cannot send" end |
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix.
75 76 77 78 |
# File 'lib/omq/routing/sub.rb', line 75 def subscribe(prefix) @subscriptions << prefix @connections.each { |conn| send_subscribe(conn, prefix) } end |
#unblock_recv ⇒ void
This method returns an undefined value.
Wakes a blocked #dequeue_recv with a nil sentinel.
39 40 41 |
# File 'lib/omq/routing/sub.rb', line 39 def unblock_recv @recv_queue.enqueue(nil) end |
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix.
85 86 87 88 |
# File 'lib/omq/routing/sub.rb', line 85 def unsubscribe(prefix) @subscriptions.delete(prefix) @connections.each { |conn| send_cancel(conn, prefix) } end |