Class: OMQ::Socket

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/omq/socket.rb

Overview

Socket base class.

Direct Known Subclasses

DEALER, PAIR, PUB, PULL, PUSH, REP, REQ, ROUTER, SUB, XPUB, XSUB

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(endpoints = nil, linger: 0) ⇒ Socket

Returns a new instance of Socket.

Parameters:

  • endpoints (String, nil) (defaults to: nil)

    optional endpoint with prefix convention (+@+ for bind, > for connect, plain uses subclass default)

  • linger (Integer) (defaults to: 0)

    linger period in seconds (default 0)



78
79
# File 'lib/omq/socket.rb', line 78

def initialize(endpoints = nil, linger: 0)
end

Instance Attribute Details

#engineEngine (readonly)

Returns the socket’s engine. Exposed for peer tooling (omq-cli, omq-ffi, omq-ractor) that needs to reach into the socket’s internals — not part of the stable user API.

Returns:

  • (Engine)

    the socket’s engine. Exposed for peer tooling (omq-cli, omq-ffi, omq-ractor) that needs to reach into the socket’s internals — not part of the stable user API.



47
48
49
# File 'lib/omq/socket.rb', line 47

def engine
  @engine
end

#last_tcp_portInteger? (readonly)

Returns last auto-selected TCP port.

Returns:

  • (Integer, nil)

    last auto-selected TCP port



40
41
42
# File 'lib/omq/socket.rb', line 40

def last_tcp_port
  @last_tcp_port
end

#optionsOptions (readonly)

Returns:



35
36
37
# File 'lib/omq/socket.rb', line 35

def options
  @options
end

Class Method Details

.bind(endpoint, **opts) ⇒ Socket

Creates a new socket and binds it to the given endpoint.

Parameters:

  • endpoint (String)
  • opts (Hash)

    keyword arguments forwarded to #initialize

Returns:



17
18
19
# File 'lib/omq/socket.rb', line 17

def self.bind(endpoint, **opts)
  new("@#{endpoint}", **opts)
end

.connect(endpoint, **opts) ⇒ Socket

Creates a new socket and connects it to the given endpoint.

Parameters:

  • endpoint (String)
  • opts (Hash)

    keyword arguments forwarded to #initialize

Returns:



28
29
30
# File 'lib/omq/socket.rb', line 28

def self.connect(endpoint, **opts)
  new(">#{endpoint}", **opts)
end

Instance Method Details

#all_peers_goneAsync::Promise

Returns resolves when all peers disconnect (after having had peers).

Returns:

  • (Async::Promise)

    resolves when all peers disconnect (after having had peers)



156
157
158
# File 'lib/omq/socket.rb', line 156

def all_peers_gone
  @engine.all_peers_gone
end

#attach_endpoints(endpoints, default:) ⇒ Object

Connects or binds based on endpoint prefix convention. Called from subclass initializers (including out-of-tree socket types).

Parameters:

  • endpoints (String, nil)
  • default (Symbol)

    :connect or :bind



279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/omq/socket.rb', line 279

def attach_endpoints(endpoints, default:)
  return unless endpoints

  case endpoints
  when /\A@(.+)\z/
    bind($1)
  when /\A>(.+)\z/
    connect($1)
  else
    __send__(default, endpoints)
  end
end

#bind(endpoint, parent: nil) ⇒ void

This method returns an undefined value.

Binds to an endpoint.

Parameters:

  • endpoint (String)
  • parent (#async, nil) (defaults to: nil)

    Async parent for the socket’s task tree. Accepts any object that responds to #asyncAsync::Task, Async::Barrier, Async::Semaphore. When given, every task spawned under this socket (connection supervisors, reconnect loops, heartbeat, monitor) is placed under parent, so callers can coordinate teardown with their own Async tree. Only the first bind/connect call captures the parent — subsequent calls ignore the kwarg.



95
96
97
98
99
100
101
# File 'lib/omq/socket.rb', line 95

def bind(endpoint, parent: nil)
  ensure_parent_task(parent: parent)
  Reactor.run do
    @engine.bind(endpoint) # TODO: use timeout?
    @last_tcp_port = @engine.last_tcp_port
  end
end

#closenil

Closes the socket and releases all resources. Drains pending sends up to linger seconds, then cascades teardown through the socket-level Async::Barrier — every connection’s per-connection barrier is stopped, cancelling every pump.

Returns:

  • (nil)


237
238
239
240
# File 'lib/omq/socket.rb', line 237

def close
  Reactor.run { @engine.close } # TODO: use timeout?
  nil
end

#close_readvoid

This method returns an undefined value.

Signals end-of-stream on the receive side. A subsequent #receive call that would otherwise block returns nil.



172
173
174
# File 'lib/omq/socket.rb', line 172

def close_read
  @engine.dequeue_recv_sentinel
end

#connect(endpoint, parent: nil) ⇒ void

This method returns an undefined value.

Connects to an endpoint.

Parameters:

  • endpoint (String)
  • parent (#async, nil) (defaults to: nil)

    see #bind.



110
111
112
113
# File 'lib/omq/socket.rb', line 110

def connect(endpoint, parent: nil)
  ensure_parent_task(parent: parent)
  Reactor.run { @engine.connect(endpoint) } # TODO: use timeout?
end

#connection_countInteger

Returns current number of peer connections.

Returns:

  • (Integer)

    current number of peer connections



162
163
164
# File 'lib/omq/socket.rb', line 162

def connection_count
  @engine.connections.size
end

#disconnect(endpoint) ⇒ void

This method returns an undefined value.

Disconnects from an endpoint.

Parameters:

  • endpoint (String)


121
122
123
# File 'lib/omq/socket.rb', line 121

def disconnect(endpoint)
  Reactor.run { @engine.disconnect(endpoint) } # TODO: use timeout?
end

#init_engine(socket_type, linger:, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) ⇒ Object

Initializes engine and options for a socket type. Called from subclass initializers (including out-of-tree socket types).

Parameters:

  • socket_type (Symbol)
  • linger (Integer)


299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/omq/socket.rb', line 299

def init_engine(socket_type, linger:, send_hwm: nil, recv_hwm: nil,
                send_timeout: nil, recv_timeout: nil, conflate: false,
                on_mute: nil, backend: nil)
  @options = Options.new(linger: linger)
  @options.send_hwm     = send_hwm     if send_hwm
  @options.recv_hwm     = recv_hwm     if recv_hwm
  @options.send_timeout = send_timeout if send_timeout
  @options.recv_timeout = recv_timeout if recv_timeout
  @options.conflate     = conflate
  @options.on_mute      = on_mute      if on_mute
  @engine = case backend
  when nil, :ruby
    Engine.new(socket_type, @options)
  when :ffi
    FFI::Engine.new(socket_type, @options)
  else
    raise ArgumentError, "unknown backend: #{backend}"
  end
end

#inspectString

Returns:

  • (String)


268
269
270
# File 'lib/omq/socket.rb', line 268

def inspect
  format("#<%s last_endpoint=%p>", self.class, last_endpoint)
end

#last_endpointString?

Returns last bound endpoint.

Returns:

  • (String, nil)

    last bound endpoint



138
139
140
# File 'lib/omq/socket.rb', line 138

def last_endpoint
  @engine.last_endpoint
end

#monitor(verbose: false) {|event| ... } ⇒ Async::Task

Yields lifecycle events for this socket.

Spawns a background fiber that reads from an internal event queue. The block receives MonitorEvent instances until the socket is closed or the returned task is stopped.

Examples:

task = socket.monitor do |event|
  case event
  in type: :connected, endpoint:
    puts "peer up: #{endpoint}"
  in type: :disconnected, endpoint:
    puts "peer down: #{endpoint}"
  end
end
# later:
task.stop

Yields:

  • (event)

    called for each lifecycle event

Yield Parameters:

Returns:

  • (Async::Task)

    the monitor task (call #stop to end early)



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/omq/socket.rb', line 199

def monitor(verbose: false, &block)
  ensure_parent_task

  queue                   = Async::LimitedQueue.new(64)
  @engine.monitor_queue   = queue
  @engine.verbose_monitor = verbose

  Reactor.run do
    @engine.parent_task.async(transient: true, annotation: "monitor") do
      while (event = queue.dequeue)
        block.call(event)
      end
    rescue Async::Stop
    ensure
      @engine.monitor_queue = nil
      block.call(MonitorEvent.new(type: :monitor_stopped))
    end
  end
end

#peer_connectedAsync::Promise

Returns resolves when first peer completes handshake.

Returns:

  • (Async::Promise)

    resolves when first peer completes handshake



144
145
146
# File 'lib/omq/socket.rb', line 144

def peer_connected
  @engine.peer_connected
end

#reconnect_enabled=(val) ⇒ void

This method returns an undefined value.

Disable auto-reconnect for connected endpoints.

Parameters:

  • val (Boolean)


225
226
227
# File 'lib/omq/socket.rb', line 225

def reconnect_enabled=(val)
  @engine.reconnect_enabled = val
end

#set_unboundednil

Set socket to use unbounded pipes (HWM=0).

Returns:

  • (nil)


259
260
261
262
263
# File 'lib/omq/socket.rb', line 259

def set_unbounded
  @options.send_hwm = 0
  @options.recv_hwm = 0
  nil
end

#stopnil

Immediate hard stop. Skips the linger drain and cascades teardown through the socket-level Async::Barrier. Intended for crash-path cleanup or when the caller already knows no pending sends matter.

Returns:

  • (nil)


249
250
251
252
# File 'lib/omq/socket.rb', line 249

def stop
  Reactor.run { @engine.stop } # TODO: use timeout?
  nil
end

#subscriber_joinedAsync::Promise

Returns resolves when first subscriber joins (PUB/XPUB only).

Returns:

  • (Async::Promise)

    resolves when first subscriber joins (PUB/XPUB only)



150
151
152
# File 'lib/omq/socket.rb', line 150

def subscriber_joined
  @engine.routing.subscriber_joined
end

#unbind(endpoint) ⇒ void

This method returns an undefined value.

Unbinds from an endpoint.

Parameters:

  • endpoint (String)


131
132
133
# File 'lib/omq/socket.rb', line 131

def unbind(endpoint)
  Reactor.run { @engine.unbind(endpoint) } # TODO: use timeout?
end