Class: OMQ::Routing::XSub

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/xsub.rb

Overview

XSUB socket routing: like SUB but subscriptions sent as data messages.

Subscriptions are sent as data frames: x01 + prefix for subscribe, x00 + prefix for unsubscribe. Each connected PUB gets its own send queue so subscription commands are delivered independently per peer.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ XSub

Returns a new instance of XSub.

Parameters:



20
21
22
23
24
25
26
27
# File 'lib/omq/routing/xsub.rb', line 20

def initialize(engine)
  @engine          = engine
  @connections     = Set.new
  @recv_queue      = FairQueue.new
  @conn_queues     = {}  # connection => per-connection send queue
  @conn_send_tasks = {}  # connection => send pump task
  @tasks           = []
end

Instance Attribute Details

#recv_queueFairQueue (readonly)

Returns:



15
16
17
# File 'lib/omq/routing/xsub.rb', line 15

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/omq/routing/xsub.rb', line 44

def connection_added(connection)
  @connections << connection

  conn_q    = Routing.build_queue(@engine.options.recv_hwm, @engine.options.on_mute)
  signaling = SignalingQueue.new(conn_q, @recv_queue)
  @recv_queue.add_queue(connection, conn_q)
  task = @engine.start_recv_pump(connection, signaling)
  @tasks << task if task

  q = Routing.build_queue(@engine.options.send_hwm, :block)
  @conn_queues[connection] = q
  start_conn_send_pump(connection, q)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


61
62
63
64
65
66
# File 'lib/omq/routing/xsub.rb', line 61

def connection_removed(connection)
  @connections.delete(connection)
  @recv_queue.remove_queue(connection)
  @conn_queues.delete(connection)
  @conn_send_tasks.delete(connection)&.stop
end

#dequeue_recvObject

Engine-facing recv contract. Delegates to the FairQueue.



32
33
34
# File 'lib/omq/routing/xsub.rb', line 32

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Enqueues a subscription command (fan-out to all connected PUBs).

Parameters:

  • parts (Array<String>)


73
74
75
76
77
# File 'lib/omq/routing/xsub.rb', line 73

def enqueue(parts)
  @connections.each do |conn|
    @conn_queues[conn]&.enqueue(parts)
  end
end

#send_queues_drained?Boolean

Returns true when all per-connection send queues are empty.

Returns:

  • (Boolean)

    true when all per-connection send queues are empty



92
93
94
# File 'lib/omq/routing/xsub.rb', line 92

def send_queues_drained?
  @conn_queues.values.all?(&:empty?)
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



84
85
86
87
# File 'lib/omq/routing/xsub.rb', line 84

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvObject



37
38
39
# File 'lib/omq/routing/xsub.rb', line 37

def unblock_recv
  @recv_queue.push(nil)
end