Class: OMQ::SUB

Inherits:
Socket show all
Includes:
Readable
Defined in:
lib/omq/pub_sub.rb

Overview

SUB socket.

Constant Summary collapse

EVERYTHING =

Returns subscription prefix to subscribe to everything.

Returns:

  • (String)

    subscription prefix to subscribe to everything

''

Instance Attribute Summary

Attributes inherited from Socket

#engine, #last_tcp_port, #options

Instance Method Summary collapse

Methods included from Readable

#receive, #wait_readable

Methods included from QueueReadable

#dequeue, #each, #wait

Methods inherited from Socket

#all_peers_gone, #attach_endpoints, bind, #bind, #close, #close_read, connect, #connect, #connection_count, #disconnect, #finalize_init, #init_engine, #inspect, #last_endpoint, #monitor, #peer_connected, #reconnect_enabled=, #set_unbounded, #stop, #subscriber_joined, #unbind

Constructor Details

#initialize(endpoints = nil, recv_hwm: nil, recv_timeout: nil, subscribe: nil, on_mute: :block, backend: nil, &block) ⇒ SUB

Returns a new instance of SUB.

Parameters:

  • endpoints (String, nil) (defaults to: nil)

    endpoint to bind/connect

  • recv_hwm (Integer, nil) (defaults to: nil)

    receive high water mark

  • recv_timeout (Numeric, nil) (defaults to: nil)

    receive timeout in seconds

  • subscribe (String, nil) (defaults to: nil)

    subscription prefix; nil (default) means no subscription — call #subscribe explicitly.

  • on_mute (Symbol) (defaults to: :block)

    :block (default), :drop_newest, or :drop_oldest

  • backend (Symbol, nil) (defaults to: nil)

    :ruby (default) or :ffi



48
49
50
51
52
53
54
55
# File 'lib/omq/pub_sub.rb', line 48

def initialize(endpoints = nil, recv_hwm: nil, recv_timeout: nil,
               subscribe: nil, on_mute: :block, backend: nil, &block)
  init_engine(:SUB, recv_hwm: recv_hwm, recv_timeout: recv_timeout,
              on_mute: on_mute, backend: backend)
  attach_endpoints(endpoints, default: :connect)
  self.subscribe(subscribe) unless subscribe.nil?
  finalize_init(&block)
end

Instance Method Details

#subscribe(prefix = EVERYTHING) ⇒ void

This method returns an undefined value.

Subscribes to a topic prefix.

Parameters:

  • prefix (String) (defaults to: EVERYTHING)


63
64
65
# File 'lib/omq/pub_sub.rb', line 63

def subscribe(prefix = EVERYTHING)
  @engine.routing.subscribe(prefix)
end

#unsubscribe(prefix) ⇒ void

This method returns an undefined value.

Unsubscribes from a topic prefix.

Parameters:

  • prefix (String)


73
74
75
# File 'lib/omq/pub_sub.rb', line 73

def unsubscribe(prefix)
  @engine.routing.unsubscribe(prefix)
end