Class: OMQ::Routing::XSub

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/xsub.rb

Overview

XSUB socket routing: like SUB but subscriptions sent as data messages.

Subscriptions are sent as data frames: x01 + prefix for subscribe, x00 + prefix for unsubscribe. Each connected PUB gets its own send queue so subscription commands are delivered independently per peer.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ XSub

Returns a new instance of XSub.

Parameters:



20
21
22
23
24
25
26
27
# File 'lib/omq/routing/xsub.rb', line 20

def initialize(engine)
  @engine          = engine
  @connections     = Set.new
  @recv_queue      = Routing.build_queue(engine.options.recv_hwm, :block)
  @conn_queues     = {}  # connection => per-connection send queue
  @conn_send_tasks = {}  # connection => send pump task
  @tasks           = []
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


15
16
17
# File 'lib/omq/routing/xsub.rb', line 15

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


50
51
52
53
54
55
56
57
58
59
# File 'lib/omq/routing/xsub.rb', line 50

def connection_added(connection)
  @connections << connection

  task = @engine.start_recv_pump(connection, @recv_queue)
  @tasks << task if task

  q = Routing.build_queue(@engine.options.send_hwm, :block)
  @conn_queues[connection] = q
  start_conn_send_pump(connection, q)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


64
65
66
67
68
# File 'lib/omq/routing/xsub.rb', line 64

def connection_removed(connection)
  @connections.delete(connection)
  @conn_queues.delete(connection)
  @conn_send_tasks.delete(connection)&.stop
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


34
35
36
# File 'lib/omq/routing/xsub.rb', line 34

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Enqueues a subscription command (fan-out to all connected PUBs).

Parameters:

  • parts (Array<String>)


75
76
77
78
79
# File 'lib/omq/routing/xsub.rb', line 75

def enqueue(parts)
  @connections.each do |conn|
    @conn_queues[conn]&.enqueue(parts)
  end
end

#send_queues_drained?Boolean

Returns true when all per-connection send queues are empty.

Returns:

  • (Boolean)

    true when all per-connection send queues are empty



94
95
96
# File 'lib/omq/routing/xsub.rb', line 94

def send_queues_drained?
  @conn_queues.values.all?(&:empty?)
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



86
87
88
89
# File 'lib/omq/routing/xsub.rb', line 86

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



43
44
45
# File 'lib/omq/routing/xsub.rb', line 43

def unblock_recv
  @recv_queue.enqueue(nil)
end