Class: OMQ::XPUB

Inherits:
Socket show all
Includes:
Readable, Writable
Defined in:
lib/omq/pub_sub.rb

Overview

XPUB socket — like PUB but exposes subscription events to the application.

Constant Summary

Constants included from Writable

Writable::EMPTY_PART

Instance Attribute Summary

Attributes inherited from Socket

#engine, #last_tcp_port, #options

Instance Method Summary collapse

Methods included from Writable

#<<, #send, #wait_writable

Methods included from QueueWritable

#enqueue

Methods included from Readable

#receive, #wait_readable

Methods included from QueueReadable

#dequeue, #each, #wait

Methods inherited from Socket

#all_peers_gone, #attach_endpoints, bind, #bind, #close, #close_read, connect, #connect, #connection_count, #disconnect, #finalize_init, #init_engine, #inspect, #last_endpoint, #monitor, #peer_connected, #reconnect_enabled=, #set_unbounded, #stop, #subscriber_joined, #unbind

Constructor Details

#initialize(endpoints = nil, linger: Float::INFINITY, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, on_mute: :drop_newest, backend: nil, &block) ⇒ XPUB

Returns a new instance of XPUB.

Parameters:

  • endpoints (String, nil) (defaults to: nil)

    endpoint to bind/connect

  • linger (Numeric) (defaults to: Float::INFINITY)

    linger period in seconds (Float::INFINITY = wait forever, 0 = drop)

  • send_hwm (Integer, nil) (defaults to: nil)

    send high water mark

  • recv_hwm (Integer, nil) (defaults to: nil)

    receive high water mark

  • send_timeout (Numeric, nil) (defaults to: nil)

    send timeout in seconds

  • recv_timeout (Numeric, nil) (defaults to: nil)

    receive timeout in seconds

  • on_mute (Symbol) (defaults to: :drop_newest)

    mute strategy for slow subscribers

  • backend (Symbol, nil) (defaults to: nil)

    :ruby (default) or :ffi



95
96
97
98
99
100
101
102
103
104
105
# File 'lib/omq/pub_sub.rb', line 95

def initialize(endpoints = nil, linger: Float::INFINITY,
               send_hwm: nil, recv_hwm: nil,
               send_timeout: nil, recv_timeout: nil,
               on_mute: :drop_newest, backend: nil, &block)
  init_engine(:XPUB, send_hwm: send_hwm, recv_hwm: recv_hwm,
              send_timeout: send_timeout, recv_timeout: recv_timeout,
              on_mute: on_mute, backend: backend)
  @options.linger = linger
  attach_endpoints(endpoints, default: :bind)
  finalize_init(&block)
end