Class: OMQ::PEER

Inherits:
Socket show all
Includes:
Readable, SingleFrame, Writable
Defined in:
lib/omq/peer.rb

Overview

Bidirectional multi-peer socket with routing IDs (ZeroMQ RFC 51).

Each connected peer is assigned a 4-byte routing ID. Supports directed sends via #send_to and fair-queued receives.

Instance Attribute Summary

Attributes inherited from Socket

#engine, #options

Instance Method Summary collapse

Methods included from SingleFrame

#send

Methods included from Writable

#<<, #send, #wait_writable

Methods included from QueueWritable

#enqueue

Methods included from Readable

#receive, #wait_readable

Methods included from QueueReadable

#dequeue, #each, #wait

Methods inherited from Socket

#all_peers_gone, #attach_endpoints, bind, #bind, #close, #close_read, connect, #connect, #connection_count, #disconnect, #finalize_init, #init_engine, #inspect, #monitor, #peer_connected, #reconnect_enabled=, #set_unbounded, #stop, #subscriber_joined, #unbind

Constructor Details

#initialize(endpoints = nil, linger: Float::INFINITY, backend: nil) ⇒ PEER

Creates a new PEER socket.

Parameters:

  • endpoints (String, Array<String>, nil) (defaults to: nil)

    endpoint(s) to connect to

  • linger (Numeric) (defaults to: Float::INFINITY)

    linger period in seconds (Float::INFINITY = wait forever, 0 = drop)

  • backend (Object, nil) (defaults to: nil)

    optional transport backend



27
28
29
30
31
# File 'lib/omq/peer.rb', line 27

def initialize(endpoints = nil, linger: Float::INFINITY, backend: nil)
  init_engine(:PEER, backend: backend)
  @options.linger = linger
  attach_endpoints(endpoints, default: :connect)
end

Instance Method Details

#send_to(routing_id, message) ⇒ self

Sends a message to a specific peer by routing ID.

Parameters:

  • routing_id (String)

    4-byte routing ID

  • message (String)

    message body

Returns:

  • (self)


40
41
42
43
44
# File 'lib/omq/peer.rb', line 40

def send_to(routing_id, message)
  parts = [routing_id, message]
  Reactor.run(timeout: @options.write_timeout) { @engine.enqueue_send(parts) }
  self
end