Class: OMQ::RADIO
- Inherits:
-
Socket
- Object
- Socket
- OMQ::RADIO
- Includes:
- Writable
- Defined in:
- lib/omq/radio_dish.rb
Overview
Group-based publisher socket (ZeroMQ RFC 48).
Sends messages to DISH peers that have joined the target group. Supports both TCP and UDP transports.
Instance Method Summary collapse
-
#<<(message) ⇒ self
Sends a message to a group via [group, body] array.
-
#initialize(endpoints = nil, linger: Float::INFINITY, on_mute: :drop_newest, conflate: false, backend: nil) ⇒ RADIO
constructor
Creates a new RADIO socket.
-
#publish(group, body) ⇒ self
Publishes a message to a group.
-
#send(message, group: nil) ⇒ self
Sends a message to a group.
Constructor Details
#initialize(endpoints = nil, linger: Float::INFINITY, on_mute: :drop_newest, conflate: false, backend: nil) ⇒ RADIO
Creates a new RADIO socket.
18 19 20 21 22 |
# File 'lib/omq/radio_dish.rb', line 18 def initialize(endpoints = nil, linger: Float::INFINITY, on_mute: :drop_newest, conflate: false, backend: nil) init_engine(:RADIO, on_mute: on_mute, conflate: conflate, backend: backend) @options.linger = linger attach_endpoints(endpoints, default: :bind) end |
Instance Method Details
#<<(message) ⇒ self
Sends a message to a group via [group, body] array.
57 58 59 60 |
# File 'lib/omq/radio_dish.rb', line 57 def <<() raise ArgumentError, "RADIO requires [group, body] array" unless .is_a?(Array) && .size == 2 publish([0], [1]) end |
#publish(group, body) ⇒ self
Publishes a message to a group.
31 32 33 34 35 36 37 |
# File 'lib/omq/radio_dish.rb', line 31 def publish(group, body) parts = [group.b.freeze, body.b.freeze] Reactor.run timeout: @options.write_timeout do @engine.enqueue_send(parts) end self end |
#send(message, group: nil) ⇒ self
Sends a message to a group.
46 47 48 49 |
# File 'lib/omq/radio_dish.rb', line 46 def send(, group: nil) raise ArgumentError, "RADIO requires a group (use group: kwarg, publish, or << [group, body])" unless group publish(group, ) end |