Class: OMQ::Routing::Radio

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/radio.rb

Overview

RADIO socket routing: group-based fan-out to DISH peers.

Like PUB/FanOut but with exact group matching and JOIN/LEAVE commands instead of SUBSCRIBE/CANCEL.

Messages are sent as two frames on the wire:

group (MORE=1) + body (MORE=0)

Constant Summary collapse

ANY_GROUPS =

Sentinel used for UDP connections that have no group filter: any group is considered a match.

Object.new.tap { |o| o.define_singleton_method(:include?) { |_| true } }.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Radio

Returns a new instance of Radio.

Parameters:

  • engine (Engine)


22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/omq/routing/radio.rb', line 22

def initialize(engine)
  @engine            = engine
  @connections       = []
  @groups            = {} # connection => Set of joined groups (or ANY_GROUPS for UDP)
  @send_queue        = Routing.build_queue(engine.options.send_hwm, :block)
  @on_mute           = engine.options.on_mute
  @send_pump_started = false
  @conflate          = engine.options.conflate
  @tasks             = []
  @written           = Set.new
  @latest            = {} if @conflate
end

Instance Attribute Details

#send_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


38
39
40
# File 'lib/omq/routing/radio.rb', line 38

def send_queue
  @send_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


49
50
51
52
53
54
55
56
57
58
# File 'lib/omq/routing/radio.rb', line 49

def connection_added(connection)
  @connections << connection
  if connection.respond_to?(:read_frame)
    @groups[connection] = Set.new
    start_group_listener(connection)
  else
    @groups[connection] = ANY_GROUPS  # UDP: fan-out to all groups
  end
  start_send_pump unless @send_pump_started
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


63
64
65
66
# File 'lib/omq/routing/radio.rb', line 63

def connection_removed(connection)
  @connections.delete(connection)
  @groups.delete(connection)
end

#enqueue(parts) ⇒ Object

Enqueues a message for sending.

Parameters:

  • parts (Array<String>)
    group, body


73
74
75
# File 'lib/omq/routing/radio.rb', line 73

def enqueue(parts)
  @send_queue.enqueue(parts)
end

#recv_queueObject

RADIO is write-only.



42
43
44
# File 'lib/omq/routing/radio.rb', line 42

def recv_queue
  raise "RADIO sockets cannot receive"
end

#send_queues_drained?Boolean

True when the send queue is empty.

Returns:

  • (Boolean)


87
88
89
# File 'lib/omq/routing/radio.rb', line 87

def send_queues_drained?
  @send_queue.empty?
end

#stopObject

Stops all background tasks (send pump, group listeners).



79
80
81
82
# File 'lib/omq/routing/radio.rb', line 79

def stop
  @tasks.each(&:stop)
  @tasks.clear
end