Class: OMQ::Socket

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/omq/socket.rb

Overview

Socket base class.

Direct Known Subclasses

CHANNEL, CLIENT, DEALER, DISH, GATHER, PAIR, PEER, PUB, PULL, PUSH, RADIO, REP, REQ, ROUTER, SCATTER, SERVER, SUB, XPUB, XSUB

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(endpoints = nil) {|the| ... } ⇒ Socket

Use option accessors (e.g. socket.linger = 0) to configure post-construction.

Parameters:

  • endpoints (String, nil) (defaults to: nil)

    optional endpoint with prefix convention (+@+ for bind, > for connect, plain uses subclass default)

Yield Parameters:

  • the (self)

    socket, when a block is passed; the socket is #closed when the block returns (or raises).



78
79
# File 'lib/omq/socket.rb', line 78

def initialize(endpoints = nil)
end

Instance Attribute Details

#engineEngine (readonly)

Returns the socket’s engine. Exposed for peer tooling (omq-cli, omq-ffi, omq-ractor) that needs to reach into the socket’s internals — not part of the stable user API.

Returns:

  • (Engine)

    the socket’s engine. Exposed for peer tooling (omq-cli, omq-ffi, omq-ractor) that needs to reach into the socket’s internals — not part of the stable user API.



42
43
44
# File 'lib/omq/socket.rb', line 42

def engine
  @engine
end

#optionsOptions (readonly)

Returns:



35
36
37
# File 'lib/omq/socket.rb', line 35

def options
  @options
end

Class Method Details

.bind(endpoint, **opts) ⇒ Socket

Creates a new socket and binds it to the given endpoint.

Parameters:

  • endpoint (String)
  • opts (Hash)

    keyword arguments forwarded to #initialize

Returns:



17
18
19
# File 'lib/omq/socket.rb', line 17

def self.bind(endpoint, **opts)
  new("@#{endpoint}", **opts)
end

.connect(endpoint, **opts) ⇒ Socket

Creates a new socket and connects it to the given endpoint.

Parameters:

  • endpoint (String)
  • opts (Hash)

    keyword arguments forwarded to #initialize

Returns:



28
29
30
# File 'lib/omq/socket.rb', line 28

def self.connect(endpoint, **opts)
  new(">#{endpoint}", **opts)
end

Instance Method Details

#all_peers_goneAsync::Promise

Returns resolves when all peers disconnect (after having had peers).

Returns:

  • (Async::Promise)

    resolves when all peers disconnect (after having had peers)



160
161
162
# File 'lib/omq/socket.rb', line 160

def all_peers_gone
  @engine.all_peers_gone
end

#attach_endpoints(endpoints, default:) ⇒ Object

Connects or binds based on endpoint prefix convention. Called from subclass initializers (including out-of-tree socket types).

Parameters:

  • endpoints (String, nil)
  • default (Symbol)

    :connect or :bind



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/omq/socket.rb', line 283

def attach_endpoints(endpoints, default:)
  return unless endpoints

  if endpoints.is_a?(Array)
    endpoints.each { |ep| attach_endpoints(ep, default: default) }
    return
  end

  case endpoints
  when /\A@(.+)\z/
    bind($1)
  when /\A>(.+)\z/
    connect($1)
  else
    __send__(default, endpoints)
  end
end

#bind(endpoint, parent: nil, **opts) ⇒ URI::Generic

Binds to an endpoint.

Parameters:

  • endpoint (String)
  • parent (#async, nil) (defaults to: nil)

    Async parent for the socket’s task tree. Accepts any object that responds to #asyncAsync::Task, Async::Barrier, Async::Semaphore. When given, every task spawned under this socket (connection supervisors, reconnect loops, heartbeat, monitor) is placed under parent, so callers can coordinate teardown with their own Async tree. Only the first bind/connect call captures the parent — subsequent calls ignore the kwarg.

Returns:

  • (URI::Generic)

    resolved endpoint URI (with auto-selected port for “tcp://host:0”)



109
110
111
112
# File 'lib/omq/socket.rb', line 109

def bind(endpoint, parent: nil, **opts)
  ensure_parent_task(parent: parent)
  Reactor.run { @engine.bind(endpoint, **opts) } # TODO: use timeout?
end

#closenil

Closes the socket and releases all resources. Drains pending sends up to linger seconds, then cascades teardown through the socket-level Async::Barrier — every connection’s per-connection barrier is stopped, cancelling every pump.

Returns:

  • (nil)


241
242
243
244
# File 'lib/omq/socket.rb', line 241

def close
  Reactor.run { @engine.close } # TODO: use timeout?
  nil
end

#close_readvoid

This method returns an undefined value.

Signals end-of-stream on the receive side. A subsequent #receive call that would otherwise block returns nil.



176
177
178
# File 'lib/omq/socket.rb', line 176

def close_read
  @engine.dequeue_recv_sentinel
end

#connect(endpoint, parent: nil, **opts) ⇒ URI::Generic

Connects to an endpoint.

Parameters:

  • endpoint (String)
  • parent (#async, nil) (defaults to: nil)

    see #bind.

Returns:

  • (URI::Generic)

    parsed endpoint URI



121
122
123
124
# File 'lib/omq/socket.rb', line 121

def connect(endpoint, parent: nil, **opts)
  ensure_parent_task(parent: parent)
  Reactor.run { @engine.connect(endpoint, **opts) } # TODO: use timeout?
end

#connection_countInteger

Returns current number of peer connections.

Returns:

  • (Integer)

    current number of peer connections



166
167
168
# File 'lib/omq/socket.rb', line 166

def connection_count
  @engine.connections.size
end

#disconnect(endpoint) ⇒ void

This method returns an undefined value.

Disconnects from an endpoint.

Parameters:

  • endpoint (String)


132
133
134
# File 'lib/omq/socket.rb', line 132

def disconnect(endpoint)
  Reactor.run { @engine.disconnect(endpoint) } # TODO: use timeout?
end

#finalize_initObject

Yields self if a block was given, then closes. Called by every subclass initializer so OMQ::PUSH.new { |p| … } behaves like File.open: configure, use, auto-close.



86
87
88
89
90
91
92
93
# File 'lib/omq/socket.rb', line 86

def finalize_init
  return unless block_given?
  begin
    yield self
  ensure
    close
  end
end

#init_engine(socket_type, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) ⇒ Object

Initializes engine and options for a socket type. Called from subclass initializers (including out-of-tree socket types).

Parameters:

  • socket_type (Symbol)


307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/omq/socket.rb', line 307

def init_engine(socket_type, send_hwm: nil, recv_hwm: nil,
                send_timeout: nil, recv_timeout: nil, conflate: false,
                on_mute: nil, backend: nil)
  @options = Options.new
  @options.send_hwm     = send_hwm     if send_hwm
  @options.recv_hwm     = recv_hwm     if recv_hwm
  @options.send_timeout = send_timeout if send_timeout
  @options.recv_timeout = recv_timeout if recv_timeout
  @options.conflate     = conflate
  @options.on_mute      = on_mute      if on_mute
  @engine = case backend
  when nil, :ruby
    Engine.new(socket_type, @options)
  when :ffi
    FFI::Engine.new(socket_type, @options)
  else
    raise ArgumentError, "unknown backend: #{backend}"
  end
end

#inspectString

Returns:

  • (String)


272
273
274
# File 'lib/omq/socket.rb', line 272

def inspect
  format("#<%s bound=%p>", self.class, @engine.listeners.keys)
end

#monitor(verbose: false) {|event| ... } ⇒ Async::Task

Yields lifecycle events for this socket.

Spawns a background fiber that reads from an internal event queue. The block receives MonitorEvent instances until the socket is closed or the returned task is stopped.

Examples:

task = socket.monitor do |event|
  case event
  in type: :connected, endpoint:
    puts "peer up: #{endpoint}"
  in type: :disconnected, endpoint:
    puts "peer down: #{endpoint}"
  end
end
# later:
task.stop

Yields:

  • (event)

    called for each lifecycle event

Yield Parameters:

Returns:

  • (Async::Task)

    the monitor task (call #stop to end early)



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/omq/socket.rb', line 203

def monitor(verbose: false, &block)
  ensure_parent_task

  queue                   = Async::LimitedQueue.new(64)
  @engine.monitor_queue   = queue
  @engine.verbose_monitor = verbose

  Reactor.run do
    @engine.parent_task.async(transient: true, annotation: "monitor") do
      while (event = queue.dequeue)
        block.call(event)
      end
    rescue Async::Stop
    ensure
      @engine.monitor_queue = nil
      block.call(MonitorEvent.new(type: :monitor_stopped))
    end
  end
end

#peer_connectedAsync::Promise

Returns resolves when first peer completes handshake.

Returns:

  • (Async::Promise)

    resolves when first peer completes handshake



148
149
150
# File 'lib/omq/socket.rb', line 148

def peer_connected
  @engine.peer_connected
end

#reconnect_enabled=(val) ⇒ void

This method returns an undefined value.

Disable auto-reconnect for connected endpoints.

Parameters:

  • val (Boolean)


229
230
231
# File 'lib/omq/socket.rb', line 229

def reconnect_enabled=(val)
  @engine.reconnect_enabled = val
end

#set_unboundednil

Set socket to use unbounded pipes (HWM=0).

Returns:

  • (nil)


263
264
265
266
267
# File 'lib/omq/socket.rb', line 263

def set_unbounded
  @options.send_hwm = 0
  @options.recv_hwm = 0
  nil
end

#stopnil

Immediate hard stop. Skips the linger drain and cascades teardown through the socket-level Async::Barrier. Intended for crash-path cleanup or when the caller already knows no pending sends matter.

Returns:

  • (nil)


253
254
255
256
# File 'lib/omq/socket.rb', line 253

def stop
  Reactor.run { @engine.stop } # TODO: use timeout?
  nil
end

#subscriber_joinedAsync::Promise

Returns resolves when first subscriber joins (PUB/XPUB only).

Returns:

  • (Async::Promise)

    resolves when first subscriber joins (PUB/XPUB only)



154
155
156
# File 'lib/omq/socket.rb', line 154

def subscriber_joined
  @engine.subscriber_joined
end

#unbind(endpoint) ⇒ void

This method returns an undefined value.

Unbinds from an endpoint.

Parameters:

  • endpoint (String)


142
143
144
# File 'lib/omq/socket.rb', line 142

def unbind(endpoint)
  Reactor.run { @engine.unbind(endpoint) } # TODO: use timeout?
end