Class: OMQ::Socket

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/omq/socket.rb

Overview

Socket base class.

Direct Known Subclasses

DEALER, PAIR, PUB, PULL, PUSH, REP, REQ, ROUTER, SUB, XPUB, XSUB

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(endpoints = nil) {|the| ... } ⇒ Socket

Use option accessors (e.g. socket.linger = 0) to configure post-construction.

Parameters:

  • endpoints (String, nil) (defaults to: nil)

    optional endpoint with prefix convention (+@+ for bind, > for connect, plain uses subclass default)

Yield Parameters:

  • the (self)

    socket, when a block is passed; the socket is #closed when the block returns (or raises).



83
84
# File 'lib/omq/socket.rb', line 83

def initialize(endpoints = nil)
end

Instance Attribute Details

#engineEngine (readonly)

Returns the socket’s engine. Exposed for peer tooling (omq-cli, omq-ffi, omq-ractor) that needs to reach into the socket’s internals — not part of the stable user API.

Returns:

  • (Engine)

    the socket’s engine. Exposed for peer tooling (omq-cli, omq-ffi, omq-ractor) that needs to reach into the socket’s internals — not part of the stable user API.



47
48
49
# File 'lib/omq/socket.rb', line 47

def engine
  @engine
end

#last_tcp_portInteger? (readonly)

Returns last auto-selected TCP port.

Returns:

  • (Integer, nil)

    last auto-selected TCP port



40
41
42
# File 'lib/omq/socket.rb', line 40

def last_tcp_port
  @last_tcp_port
end

#optionsOptions (readonly)

Returns:



35
36
37
# File 'lib/omq/socket.rb', line 35

def options
  @options
end

Class Method Details

.bind(endpoint, **opts) ⇒ Socket

Creates a new socket and binds it to the given endpoint.

Parameters:

  • endpoint (String)
  • opts (Hash)

    keyword arguments forwarded to #initialize

Returns:



17
18
19
# File 'lib/omq/socket.rb', line 17

def self.bind(endpoint, **opts)
  new("@#{endpoint}", **opts)
end

.connect(endpoint, **opts) ⇒ Socket

Creates a new socket and connects it to the given endpoint.

Parameters:

  • endpoint (String)
  • opts (Hash)

    keyword arguments forwarded to #initialize

Returns:



28
29
30
# File 'lib/omq/socket.rb', line 28

def self.connect(endpoint, **opts)
  new(">#{endpoint}", **opts)
end

Instance Method Details

#all_peers_goneAsync::Promise

Returns resolves when all peers disconnect (after having had peers).

Returns:

  • (Async::Promise)

    resolves when all peers disconnect (after having had peers)



175
176
177
# File 'lib/omq/socket.rb', line 175

def all_peers_gone
  @engine.all_peers_gone
end

#attach_endpoints(endpoints, default:) ⇒ Object

Connects or binds based on endpoint prefix convention. Called from subclass initializers (including out-of-tree socket types).

Parameters:

  • endpoints (String, nil)
  • default (Symbol)

    :connect or :bind



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/omq/socket.rb', line 298

def attach_endpoints(endpoints, default:)
  return unless endpoints

  if endpoints.is_a?(Array)
    endpoints.each { |ep| attach_endpoints(ep, default: default) }
    return
  end

  case endpoints
  when /\A@(.+)\z/
    bind($1)
  when /\A>(.+)\z/
    connect($1)
  else
    __send__(default, endpoints)
  end
end

#bind(endpoint, parent: nil) ⇒ void

This method returns an undefined value.

Binds to an endpoint.

Parameters:

  • endpoint (String)
  • parent (#async, nil) (defaults to: nil)

    Async parent for the socket’s task tree. Accepts any object that responds to #asyncAsync::Task, Async::Barrier, Async::Semaphore. When given, every task spawned under this socket (connection supervisors, reconnect loops, heartbeat, monitor) is placed under parent, so callers can coordinate teardown with their own Async tree. Only the first bind/connect call captures the parent — subsequent calls ignore the kwarg.



114
115
116
117
118
119
120
# File 'lib/omq/socket.rb', line 114

def bind(endpoint, parent: nil)
  ensure_parent_task(parent: parent)
  Reactor.run do
    @engine.bind(endpoint) # TODO: use timeout?
    @last_tcp_port = @engine.last_tcp_port
  end
end

#closenil

Closes the socket and releases all resources. Drains pending sends up to linger seconds, then cascades teardown through the socket-level Async::Barrier — every connection’s per-connection barrier is stopped, cancelling every pump.

Returns:

  • (nil)


256
257
258
259
# File 'lib/omq/socket.rb', line 256

def close
  Reactor.run { @engine.close } # TODO: use timeout?
  nil
end

#close_readvoid

This method returns an undefined value.

Signals end-of-stream on the receive side. A subsequent #receive call that would otherwise block returns nil.



191
192
193
# File 'lib/omq/socket.rb', line 191

def close_read
  @engine.dequeue_recv_sentinel
end

#connect(endpoint, parent: nil) ⇒ void

This method returns an undefined value.

Connects to an endpoint.

Parameters:

  • endpoint (String)
  • parent (#async, nil) (defaults to: nil)

    see #bind.



129
130
131
132
# File 'lib/omq/socket.rb', line 129

def connect(endpoint, parent: nil)
  ensure_parent_task(parent: parent)
  Reactor.run { @engine.connect(endpoint) } # TODO: use timeout?
end

#connection_countInteger

Returns current number of peer connections.

Returns:

  • (Integer)

    current number of peer connections



181
182
183
# File 'lib/omq/socket.rb', line 181

def connection_count
  @engine.connections.size
end

#disconnect(endpoint) ⇒ void

This method returns an undefined value.

Disconnects from an endpoint.

Parameters:

  • endpoint (String)


140
141
142
# File 'lib/omq/socket.rb', line 140

def disconnect(endpoint)
  Reactor.run { @engine.disconnect(endpoint) } # TODO: use timeout?
end

#finalize_initObject

Yields self if a block was given, then closes. Called by every subclass initializer so OMQ::PUSH.new { |p| … } behaves like File.open: configure, use, auto-close.



91
92
93
94
95
96
97
98
# File 'lib/omq/socket.rb', line 91

def finalize_init
  return unless block_given?
  begin
    yield self
  ensure
    close
  end
end

#init_engine(socket_type, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) ⇒ Object

Initializes engine and options for a socket type. Called from subclass initializers (including out-of-tree socket types).

Parameters:

  • socket_type (Symbol)


322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/omq/socket.rb', line 322

def init_engine(socket_type, send_hwm: nil, recv_hwm: nil,
                send_timeout: nil, recv_timeout: nil, conflate: false,
                on_mute: nil, backend: nil)
  @options = Options.new
  @options.send_hwm     = send_hwm     if send_hwm
  @options.recv_hwm     = recv_hwm     if recv_hwm
  @options.send_timeout = send_timeout if send_timeout
  @options.recv_timeout = recv_timeout if recv_timeout
  @options.conflate     = conflate
  @options.on_mute      = on_mute      if on_mute
  @engine = case backend
  when nil, :ruby
    Engine.new(socket_type, @options)
  when :ffi
    FFI::Engine.new(socket_type, @options)
  else
    raise ArgumentError, "unknown backend: #{backend}"
  end
end

#inspectString

Returns:

  • (String)


287
288
289
# File 'lib/omq/socket.rb', line 287

def inspect
  format("#<%s last_endpoint=%p>", self.class, last_endpoint)
end

#last_endpointString?

Returns last bound endpoint.

Returns:

  • (String, nil)

    last bound endpoint



157
158
159
# File 'lib/omq/socket.rb', line 157

def last_endpoint
  @engine.last_endpoint
end

#monitor(verbose: false) {|event| ... } ⇒ Async::Task

Yields lifecycle events for this socket.

Spawns a background fiber that reads from an internal event queue. The block receives MonitorEvent instances until the socket is closed or the returned task is stopped.

Examples:

task = socket.monitor do |event|
  case event
  in type: :connected, endpoint:
    puts "peer up: #{endpoint}"
  in type: :disconnected, endpoint:
    puts "peer down: #{endpoint}"
  end
end
# later:
task.stop

Yields:

  • (event)

    called for each lifecycle event

Yield Parameters:

Returns:

  • (Async::Task)

    the monitor task (call #stop to end early)



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/omq/socket.rb', line 218

def monitor(verbose: false, &block)
  ensure_parent_task

  queue                   = Async::LimitedQueue.new(64)
  @engine.monitor_queue   = queue
  @engine.verbose_monitor = verbose

  Reactor.run do
    @engine.parent_task.async(transient: true, annotation: "monitor") do
      while (event = queue.dequeue)
        block.call(event)
      end
    rescue Async::Stop
    ensure
      @engine.monitor_queue = nil
      block.call(MonitorEvent.new(type: :monitor_stopped))
    end
  end
end

#peer_connectedAsync::Promise

Returns resolves when first peer completes handshake.

Returns:

  • (Async::Promise)

    resolves when first peer completes handshake



163
164
165
# File 'lib/omq/socket.rb', line 163

def peer_connected
  @engine.peer_connected
end

#reconnect_enabled=(val) ⇒ void

This method returns an undefined value.

Disable auto-reconnect for connected endpoints.

Parameters:

  • val (Boolean)


244
245
246
# File 'lib/omq/socket.rb', line 244

def reconnect_enabled=(val)
  @engine.reconnect_enabled = val
end

#set_unboundednil

Set socket to use unbounded pipes (HWM=0).

Returns:

  • (nil)


278
279
280
281
282
# File 'lib/omq/socket.rb', line 278

def set_unbounded
  @options.send_hwm = 0
  @options.recv_hwm = 0
  nil
end

#stopnil

Immediate hard stop. Skips the linger drain and cascades teardown through the socket-level Async::Barrier. Intended for crash-path cleanup or when the caller already knows no pending sends matter.

Returns:

  • (nil)


268
269
270
271
# File 'lib/omq/socket.rb', line 268

def stop
  Reactor.run { @engine.stop } # TODO: use timeout?
  nil
end

#subscriber_joinedAsync::Promise

Returns resolves when first subscriber joins (PUB/XPUB only).

Returns:

  • (Async::Promise)

    resolves when first subscriber joins (PUB/XPUB only)



169
170
171
# File 'lib/omq/socket.rb', line 169

def subscriber_joined
  @engine.routing.subscriber_joined
end

#unbind(endpoint) ⇒ void

This method returns an undefined value.

Unbinds from an endpoint.

Parameters:

  • endpoint (String)


150
151
152
# File 'lib/omq/socket.rb', line 150

def unbind(endpoint)
  Reactor.run { @engine.unbind(endpoint) } # TODO: use timeout?
end