Class: OMQ::RADIO

Inherits:
Socket
  • Object
show all
Includes:
Writable
Defined in:
lib/omq/radio_dish.rb

Overview

Group-based publisher socket (ZeroMQ RFC 48).

Sends messages to DISH peers that have joined the target group. Supports both TCP and UDP transports.

Instance Method Summary collapse

Constructor Details

#initialize(endpoints = nil, linger: Float::INFINITY, on_mute: :drop_newest, conflate: false, backend: nil) ⇒ RADIO

Creates a new RADIO socket.

Parameters:

  • endpoints (String, Array<String>, nil) (defaults to: nil)

    endpoint(s) to bind to

  • linger (Numeric) (defaults to: Float::INFINITY)

    linger period in seconds (Float::INFINITY = wait forever, 0 = drop)

  • on_mute (Symbol) (defaults to: :drop_newest)

    behaviour when HWM is reached (:drop_newest or :block)

  • conflate (Boolean) (defaults to: false)

    if true, keep only the latest message per group per peer

  • backend (Object, nil) (defaults to: nil)

    optional transport backend



18
19
20
21
22
# File 'lib/omq/radio_dish.rb', line 18

def initialize(endpoints = nil, linger: Float::INFINITY, on_mute: :drop_newest, conflate: false, backend: nil)
  init_engine(:RADIO, on_mute: on_mute, conflate: conflate, backend: backend)
  @options.linger = linger
  attach_endpoints(endpoints, default: :bind)
end

Instance Method Details

#<<(message) ⇒ self

Sends a message to a group via [group, body] array.

Parameters:

  • message (Array<String>)
    group, body

Returns:

  • (self)

Raises:

  • (ArgumentError)


57
58
59
60
# File 'lib/omq/radio_dish.rb', line 57

def <<(message)
  raise ArgumentError, "RADIO requires [group, body] array" unless message.is_a?(Array) && message.size == 2
  publish(message[0], message[1])
end

#publish(group, body) ⇒ self

Publishes a message to a group.

Parameters:

  • group (String)

    group name

  • body (String)

    message body

Returns:

  • (self)


31
32
33
34
35
36
37
# File 'lib/omq/radio_dish.rb', line 31

def publish(group, body)
  parts = [group.b.freeze, body.b.freeze]
  Reactor.run timeout: @options.write_timeout do
    @engine.enqueue_send(parts)
  end
  self
end

#send(message, group: nil) ⇒ self

Sends a message to a group.

Parameters:

  • message (String)

    message body (requires group: kwarg)

  • group (String) (defaults to: nil)

    group name

Returns:

  • (self)

Raises:

  • (ArgumentError)


46
47
48
49
# File 'lib/omq/radio_dish.rb', line 46

def send(message, group: nil)
  raise ArgumentError, "RADIO requires a group (use group: kwarg, publish, or << [group, body])" unless group
  publish(group, message)
end