Class: OMQ::Routing::Sub
- Inherits:
-
Object
- Object
- OMQ::Routing::Sub
- Defined in:
- lib/omq/routing/sub.rb
Overview
SUB socket routing: subscription-based receive from PUB peers.
Sends SUBSCRIBE/CANCEL commands to connected PUB peers.
Instance Attribute Summary collapse
- #recv_queue ⇒ Async::LimitedQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message.
-
#enqueue(_parts) ⇒ Object
SUB is read-only.
-
#initialize(engine) ⇒ Sub
constructor
A new instance of Sub.
-
#stop ⇒ void
Stops all background tasks.
-
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix.
-
#unblock_recv ⇒ void
Wakes a blocked #dequeue_recv with a nil sentinel.
-
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix.
Constructor Details
#initialize(engine) ⇒ Sub
Returns a new instance of Sub.
18 19 20 21 22 23 24 |
# File 'lib/omq/routing/sub.rb', line 18 def initialize(engine) @engine = engine @connections = Set.new @recv_queue = Routing.build_queue(engine..recv_hwm, :block) @subscriptions = Set.new @tasks = [] end |
Instance Attribute Details
#recv_queue ⇒ Async::LimitedQueue (readonly)
13 14 15 |
# File 'lib/omq/routing/sub.rb', line 13 def recv_queue @recv_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/omq/routing/sub.rb', line 47 def connection_added(connection) @connections << connection @subscriptions.each do |prefix| send_subscribe(connection, prefix) end task = @engine.start_recv_pump(connection, @recv_queue) @tasks << task if task end |
#connection_removed(connection) ⇒ Object
61 62 63 |
# File 'lib/omq/routing/sub.rb', line 61 def connection_removed(connection) @connections.delete(connection) end |
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message. Blocks until one is available.
31 32 33 |
# File 'lib/omq/routing/sub.rb', line 31 def dequeue_recv @recv_queue.dequeue end |
#enqueue(_parts) ⇒ Object
SUB is read-only.
68 69 70 |
# File 'lib/omq/routing/sub.rb', line 68 def enqueue(_parts) raise "SUB sockets cannot send" end |
#stop ⇒ void
This method returns an undefined value.
Stops all background tasks.
97 98 99 100 |
# File 'lib/omq/routing/sub.rb', line 97 def stop @tasks.each(&:stop) @tasks.clear end |
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix.
77 78 79 80 |
# File 'lib/omq/routing/sub.rb', line 77 def subscribe(prefix) @subscriptions << prefix @connections.each { |conn| send_subscribe(conn, prefix) } end |
#unblock_recv ⇒ void
This method returns an undefined value.
Wakes a blocked #dequeue_recv with a nil sentinel.
40 41 42 |
# File 'lib/omq/routing/sub.rb', line 40 def unblock_recv @recv_queue.enqueue(nil) end |
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix.
87 88 89 90 |
# File 'lib/omq/routing/sub.rb', line 87 def unsubscribe(prefix) @subscriptions.delete(prefix) @connections.each { |conn| send_cancel(conn, prefix) } end |