Class: OMQ::Routing::Radio
- Inherits:
-
Object
- Object
- OMQ::Routing::Radio
- Defined in:
- lib/omq/routing/radio.rb
Overview
RADIO socket routing: group-based fan-out to DISH peers.
Like PUB/FanOut but with exact group matching and JOIN/LEAVE commands instead of SUBSCRIBE/CANCEL.
Messages are sent as two frames on the wire:
group (MORE=1) + body (MORE=0)
Constant Summary collapse
- ANY_GROUPS =
Sentinel used for UDP connections that have no group filter: any group is considered a match.
Object.new.tap { |o| o.define_singleton_method(:include?) { |_| true } }.freeze
Instance Attribute Summary collapse
- #send_queue ⇒ Async::LimitedQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#enqueue(parts) ⇒ Object
Enqueues a message for sending.
-
#initialize(engine) ⇒ Radio
constructor
A new instance of Radio.
-
#recv_queue ⇒ Object
RADIO is write-only.
-
#send_queues_drained? ⇒ Boolean
True when the send queue is empty.
-
#unblock_recv ⇒ Object
No-op; RADIO has no recv queue to unblock.
Constructor Details
#initialize(engine) ⇒ Radio
Returns a new instance of Radio.
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/omq/routing/radio.rb', line 27 def initialize(engine) @engine = engine @connections = [] @groups = {} # connection => Set of joined groups (or ANY_GROUPS for UDP) @send_queue = Routing.build_queue(engine..send_hwm, :block) @on_mute = engine..on_mute @send_pump_started = false @conflate = engine..conflate @written = Set.new @latest = {} if @conflate end |
Instance Attribute Details
#send_queue ⇒ Async::LimitedQueue (readonly)
22 23 24 |
# File 'lib/omq/routing/radio.rb', line 22 def send_queue @send_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/omq/routing/radio.rb', line 55 def connection_added(connection) @connections << connection if connection.respond_to?(:read_frame) @groups[connection] = Set.new start_group_listener(connection) else @groups[connection] = ANY_GROUPS # UDP: fan-out to all groups end start_send_pump unless @send_pump_started end |
#connection_removed(connection) ⇒ Object
69 70 71 72 |
# File 'lib/omq/routing/radio.rb', line 69 def connection_removed(connection) @connections.delete(connection) @groups.delete(connection) end |
#enqueue(parts) ⇒ Object
Enqueues a message for sending.
79 80 81 |
# File 'lib/omq/routing/radio.rb', line 79 def enqueue(parts) @send_queue.enqueue(parts) end |
#recv_queue ⇒ Object
RADIO is write-only.
42 43 44 |
# File 'lib/omq/routing/radio.rb', line 42 def recv_queue raise "RADIO sockets cannot receive" end |
#send_queues_drained? ⇒ Boolean
True when the send queue is empty.
86 87 88 |
# File 'lib/omq/routing/radio.rb', line 86 def send_queues_drained? @send_queue.empty? end |
#unblock_recv ⇒ Object
No-op; RADIO has no recv queue to unblock.
49 50 |
# File 'lib/omq/routing/radio.rb', line 49 def unblock_recv end |