Class: OMQ::Routing::XSub

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/xsub.rb

Overview

XSUB socket routing: like SUB but subscriptions sent as data messages.

Subscriptions are sent as data frames: x01 + prefix for subscribe, x00 + prefix for unsubscribe. Each connected PUB gets its own send queue so subscription commands are delivered independently per peer.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ XSub

Returns a new instance of XSub.

Parameters:



20
21
22
23
24
25
# File 'lib/omq/routing/xsub.rb', line 20

def initialize(engine)
  @engine      = engine
  @connections = Set.new
  @recv_queue  = Routing.build_queue(engine.options.recv_hwm, :block)
  @conn_queues = {}
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


15
16
17
# File 'lib/omq/routing/xsub.rb', line 15

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)


48
49
50
51
52
53
54
55
56
# File 'lib/omq/routing/xsub.rb', line 48

def connection_added(connection)
  @connections << connection

  @engine.start_recv_pump(connection, @recv_queue)

  q = Routing.build_queue(@engine.options.send_hwm, :block)
  @conn_queues[connection] = q
  start_conn_send_pump(connection, q)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)


61
62
63
64
# File 'lib/omq/routing/xsub.rb', line 61

def connection_removed(connection)
  @connections.delete(connection)
  @conn_queues.delete(connection)
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


32
33
34
# File 'lib/omq/routing/xsub.rb', line 32

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Enqueues a subscription command (fan-out to all connected PUBs).

Parameters:

  • parts (Array<String>)


71
72
73
74
75
# File 'lib/omq/routing/xsub.rb', line 71

def enqueue(parts)
  @connections.each do |conn|
    @conn_queues[conn]&.enqueue(parts)
  end
end

#send_queues_drained?Boolean

Returns true when all per-connection send queues are empty.

Returns:

  • (Boolean)

    true when all per-connection send queues are empty



80
81
82
# File 'lib/omq/routing/xsub.rb', line 80

def send_queues_drained?
  @conn_queues.values.all?(&:empty?)
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



41
42
43
# File 'lib/omq/routing/xsub.rb', line 41

def unblock_recv
  @recv_queue.enqueue(nil)
end