Class: NNQ::Routing::Sub

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/sub.rb

Overview

SUB side of the pub/sub pattern (nng sub0).

All filtering happens locally — pub0 broadcasts blindly and sub0 drops messages that don’t match any subscription prefix. An empty subscription set means receive nothing (matching nng — unlike ZeroMQ’s pre-4.x “no subscription = receive everything”).

Subscriptions are byte-prefix matches. A subscription to the empty string matches every message.

Instance Method Summary collapse

Constructor Details

#initializeSub

Returns a new instance of Sub.



18
19
20
21
# File 'lib/nnq/routing/sub.rb', line 18

def initialize
  @queue         = Async::Queue.new
  @subscriptions = [] # array of byte strings
end

Instance Method Details

#closeObject



47
48
49
# File 'lib/nnq/routing/sub.rb', line 47

def close
  @queue.enqueue(nil)
end

#close_readObject



52
53
54
# File 'lib/nnq/routing/sub.rb', line 52

def close_read
  @queue.enqueue(nil)
end

#enqueue(body, _conn = nil) ⇒ Object



35
36
37
38
# File 'lib/nnq/routing/sub.rb', line 35

def enqueue(body, _conn = nil)
  return unless matches?(body)
  @queue.enqueue(body)
end

#receiveString?

Returns:

  • (String, nil)


42
43
44
# File 'lib/nnq/routing/sub.rb', line 42

def receive
  @queue.dequeue
end

#subscribe(prefix) ⇒ Object



24
25
26
27
# File 'lib/nnq/routing/sub.rb', line 24

def subscribe(prefix)
  prefix = prefix.b
  @subscriptions << prefix unless @subscriptions.include?(prefix)
end

#unsubscribe(prefix) ⇒ Object



30
31
32
# File 'lib/nnq/routing/sub.rb', line 30

def unsubscribe(prefix)
  @subscriptions.delete(prefix.b)
end