Class: NNQ::Routing::Sub

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/sub.rb

Overview

SUB side of the pub/sub pattern (nng sub0).

All filtering happens locally — pub0 broadcasts blindly and sub0 drops messages that don’t match any subscription prefix. An empty subscription set means receive nothing (matching nng — unlike ZeroMQ’s pre-4.x “no subscription = receive everything”).

Subscriptions are byte-prefix matches. A subscription to the empty string matches every message.

Instance Method Summary collapse

Constructor Details

#initializeSub

Returns a new instance of Sub.



18
19
20
21
# File 'lib/nnq/routing/sub.rb', line 18

def initialize
  @queue         = Async::Queue.new
  @subscriptions = [] # array of byte strings
end

Instance Method Details

#closeObject



54
55
56
# File 'lib/nnq/routing/sub.rb', line 54

def close
  @queue.enqueue(nil)
end

#close_readObject



59
60
61
# File 'lib/nnq/routing/sub.rb', line 59

def close_read
  @queue.enqueue(nil)
end

#direct_recv_for(_conn) ⇒ Object

Inproc fast-path hook: filter via the subscription list in the transform, then enqueue only matching bodies.



43
44
45
# File 'lib/nnq/routing/sub.rb', line 43

def direct_recv_for(_conn)
  [@queue, ->(body) { matches?(body) ? body : nil }]
end

#enqueue(body, _conn = nil) ⇒ Object



35
36
37
38
# File 'lib/nnq/routing/sub.rb', line 35

def enqueue(body, _conn = nil)
  return unless matches?(body)
  @queue.enqueue(body)
end

#receiveString?

Returns:

  • (String, nil)


49
50
51
# File 'lib/nnq/routing/sub.rb', line 49

def receive
  @queue.dequeue
end

#subscribe(prefix) ⇒ Object



24
25
26
27
# File 'lib/nnq/routing/sub.rb', line 24

def subscribe(prefix)
  prefix = prefix.b
  @subscriptions << prefix unless @subscriptions.include?(prefix)
end

#unsubscribe(prefix) ⇒ Object



30
31
32
# File 'lib/nnq/routing/sub.rb', line 30

def unsubscribe(prefix)
  @subscriptions.delete(prefix.b)
end