Class: OMQ::Routing::Radio
- Inherits:
-
Object
- Object
- OMQ::Routing::Radio
- Defined in:
- lib/omq/routing/radio.rb
Overview
RADIO socket routing: group-based fan-out to DISH peers.
Like PUB/FanOut but with exact group matching and JOIN/LEAVE commands instead of SUBSCRIBE/CANCEL.
Messages are sent as two frames on the wire:
group (MORE=1) + body (MORE=0)
Constant Summary collapse
- ANY_GROUPS =
Sentinel used for UDP connections that have no group filter: any group is considered a match.
Object.new.tap { |o| o.define_singleton_method(:include?) { |_| true } }.freeze
Instance Attribute Summary collapse
- #send_queue ⇒ Async::LimitedQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#enqueue(parts) ⇒ Object
Enqueues a message for sending.
-
#initialize(engine) ⇒ Radio
constructor
A new instance of Radio.
-
#recv_queue ⇒ Object
RADIO is write-only.
-
#send_queues_drained? ⇒ Boolean
True when the send queue is empty.
-
#stop ⇒ Object
Stops all background tasks (send pump, group listeners).
Constructor Details
#initialize(engine) ⇒ Radio
Returns a new instance of Radio.
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/omq/routing/radio.rb', line 22 def initialize(engine) @engine = engine @connections = [] @groups = {} # connection => Set of joined groups (or ANY_GROUPS for UDP) @send_queue = Routing.build_queue(engine..send_hwm, :block) @on_mute = engine..on_mute @send_pump_started = false @conflate = engine..conflate @tasks = [] @written = Set.new @latest = {} if @conflate end |
Instance Attribute Details
#send_queue ⇒ Async::LimitedQueue (readonly)
38 39 40 |
# File 'lib/omq/routing/radio.rb', line 38 def send_queue @send_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/omq/routing/radio.rb', line 49 def connection_added(connection) @connections << connection if connection.respond_to?(:read_frame) @groups[connection] = Set.new start_group_listener(connection) else @groups[connection] = ANY_GROUPS # UDP: fan-out to all groups end start_send_pump unless @send_pump_started end |
#connection_removed(connection) ⇒ Object
63 64 65 66 |
# File 'lib/omq/routing/radio.rb', line 63 def connection_removed(connection) @connections.delete(connection) @groups.delete(connection) end |
#enqueue(parts) ⇒ Object
Enqueues a message for sending.
73 74 75 |
# File 'lib/omq/routing/radio.rb', line 73 def enqueue(parts) @send_queue.enqueue(parts) end |
#recv_queue ⇒ Object
RADIO is write-only.
42 43 44 |
# File 'lib/omq/routing/radio.rb', line 42 def recv_queue raise "RADIO sockets cannot receive" end |
#send_queues_drained? ⇒ Boolean
True when the send queue is empty.
87 88 89 |
# File 'lib/omq/routing/radio.rb', line 87 def send_queues_drained? @send_queue.empty? end |
#stop ⇒ Object
Stops all background tasks (send pump, group listeners).
79 80 81 82 |
# File 'lib/omq/routing/radio.rb', line 79 def stop @tasks.each(&:stop) @tasks.clear end |