Class: NNQ::Routing::Sub
- Inherits:
-
Object
- Object
- NNQ::Routing::Sub
- Defined in:
- lib/nnq/routing/sub.rb
Overview
SUB side of the pub/sub pattern (nng sub0).
All filtering happens locally — pub0 broadcasts blindly and sub0 drops messages that don’t match any subscription prefix. An empty subscription set means receive nothing (matching nng — unlike ZeroMQ’s pre-4.x “no subscription = receive everything”).
Subscriptions are byte-prefix matches. A subscription to the empty string matches every message.
Instance Method Summary collapse
- #close ⇒ Object
- #close_read ⇒ Object
- #enqueue(body, _conn = nil) ⇒ Object
-
#initialize ⇒ Sub
constructor
A new instance of Sub.
- #receive ⇒ String?
- #subscribe(prefix) ⇒ Object
- #unsubscribe(prefix) ⇒ Object
Constructor Details
#initialize ⇒ Sub
Returns a new instance of Sub.
18 19 20 21 |
# File 'lib/nnq/routing/sub.rb', line 18 def initialize @queue = Async::Queue.new @subscriptions = [] # array of byte strings end |
Instance Method Details
#close ⇒ Object
47 48 49 |
# File 'lib/nnq/routing/sub.rb', line 47 def close @queue.enqueue(nil) end |
#close_read ⇒ Object
52 53 54 |
# File 'lib/nnq/routing/sub.rb', line 52 def close_read @queue.enqueue(nil) end |
#enqueue(body, _conn = nil) ⇒ Object
35 36 37 38 |
# File 'lib/nnq/routing/sub.rb', line 35 def enqueue(body, _conn = nil) return unless matches?(body) @queue.enqueue(body) end |
#receive ⇒ String?
42 43 44 |
# File 'lib/nnq/routing/sub.rb', line 42 def receive @queue.dequeue end |
#subscribe(prefix) ⇒ Object
24 25 26 27 |
# File 'lib/nnq/routing/sub.rb', line 24 def subscribe(prefix) prefix = prefix.b @subscriptions << prefix unless @subscriptions.include?(prefix) end |
#unsubscribe(prefix) ⇒ Object
30 31 32 |
# File 'lib/nnq/routing/sub.rb', line 30 def unsubscribe(prefix) @subscriptions.delete(prefix.b) end |