Class: OMQ::RADIO
- Includes:
- Writable
- Defined in:
- lib/omq/radio_dish.rb
Overview
Group-based publisher socket (ZeroMQ RFC 48).
Sends messages to DISH peers that have joined the target group. Supports both TCP and UDP transports.
Constant Summary
Constants included from Writable
Instance Attribute Summary
Attributes inherited from Socket
Instance Method Summary collapse
-
#<<(message) ⇒ self
Sends a message to a group via [group, body] array.
-
#initialize(endpoints = nil, linger: Float::INFINITY, on_mute: :drop_newest, conflate: false, backend: nil) ⇒ RADIO
constructor
Creates a new RADIO socket.
-
#publish(group, body) ⇒ self
Publishes a message to a group.
-
#send(message, group: nil) ⇒ self
Sends a message to a group.
Methods included from Writable
Methods included from QueueWritable
Methods inherited from Socket
#all_peers_gone, #attach_endpoints, bind, #bind, #close, #close_read, connect, #connect, #connection_count, #disconnect, #finalize_init, #init_engine, #inspect, #monitor, #peer_connected, #reconnect_enabled=, #set_unbounded, #stop, #subscriber_joined, #unbind
Constructor Details
#initialize(endpoints = nil, linger: Float::INFINITY, on_mute: :drop_newest, conflate: false, backend: nil) ⇒ RADIO
Creates a new RADIO socket.
31 32 33 34 35 |
# File 'lib/omq/radio_dish.rb', line 31 def initialize(endpoints = nil, linger: Float::INFINITY, on_mute: :drop_newest, conflate: false, backend: nil) init_engine(:RADIO, on_mute: on_mute, conflate: conflate, backend: backend) @options.linger = linger attach_endpoints(endpoints, default: :bind) end |
Instance Method Details
#<<(message) ⇒ self
Sends a message to a group via [group, body] array.
70 71 72 73 |
# File 'lib/omq/radio_dish.rb', line 70 def <<() raise ArgumentError, "RADIO requires [group, body] array" unless .is_a?(Array) && .size == 2 publish([0], [1]) end |
#publish(group, body) ⇒ self
Publishes a message to a group.
44 45 46 47 48 49 50 |
# File 'lib/omq/radio_dish.rb', line 44 def publish(group, body) parts = [group.b.freeze, body.b.freeze] Reactor.run timeout: @options.write_timeout do @engine.enqueue_send(parts) end self end |
#send(message, group: nil) ⇒ self
Sends a message to a group.
59 60 61 62 |
# File 'lib/omq/radio_dish.rb', line 59 def send(, group: nil) raise ArgumentError, "RADIO requires a group (use group: kwarg, publish, or << [group, body])" unless group publish(group, ) end |