Class: OMQ::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/engine.rb,
lib/omq/engine/heartbeat.rb,
lib/omq/engine/reconnect.rb,
lib/omq/engine/recv_pump.rb,
lib/omq/engine/maintenance.rb,
lib/omq/engine/socket_lifecycle.rb,
lib/omq/engine/connection_lifecycle.rb

Overview

Per-socket orchestrator.

Manages connections, transports, and the routing strategy for one OMQ::Socket instance. Each socket type creates one Engine.

Defined Under Namespace

Modules: Heartbeat, Maintenance Classes: ConnectionLifecycle, Reconnect, RecvPump, SocketLifecycle

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket_type, options) ⇒ Engine

Returns a new instance of Engine.

Parameters:

  • socket_type (Symbol)

    e.g. :REQ, :REP, :PAIR

  • options (Options)


83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/omq/engine.rb', line 83

def initialize(socket_type, options)
  @socket_type     = socket_type
  @options         = options
  @routing         = nil
  @connections     = {} # connection => ConnectionLifecycle
  @dialers         = {} # endpoint => Dialer (reconnect intent + connect logic)
  @listeners       = {} # endpoint => Listener
  @lifecycle       = SocketLifecycle.new
  @fatal_error     = nil
  @monitor_queue   = nil
  @verbose_monitor = false
end

Class Attribute Details

.transportsHash{String => Module} (readonly)

Returns registered transports.

Returns:

  • (Hash{String => Module})

    registered transports



27
28
29
# File 'lib/omq/engine.rb', line 27

def transports
  @transports
end

Instance Attribute Details

#connection_wrapperObject

Optional proc that wraps new connections (e.g. for serialization). Called with the raw connection; must return the (possibly wrapped) connection.



54
55
56
# File 'lib/omq/engine.rb', line 54

def connection_wrapper
  @connection_wrapper
end

#connectionsHash{Connection => ConnectionLifecycle} (readonly)

Returns active connections.

Returns:



48
49
50
# File 'lib/omq/engine.rb', line 48

def connections
  @connections
end

#lifecycleSocketLifecycle (readonly)

Returns socket-level state + signaling.

Returns:



59
60
61
# File 'lib/omq/engine.rb', line 59

def lifecycle
  @lifecycle
end

#listenersHash{String => Listener} (readonly)

Returns active listeners keyed by resolved endpoint.

Returns:

  • (Hash{String => Listener})

    active listeners keyed by resolved endpoint



43
44
45
# File 'lib/omq/engine.rb', line 43

def listeners
  @listeners
end

#monitor_queue=(value) ⇒ Object (writeonly)

Sets the attribute monitor_queue

Parameters:

  • value

    the value to set the attribute monitor_queue to.



65
66
67
# File 'lib/omq/engine.rb', line 65

def monitor_queue=(value)
  @monitor_queue = value
end

#optionsOptions (readonly)

Returns socket options.

Returns:



38
39
40
# File 'lib/omq/engine.rb', line 38

def options
  @options
end

#socket_typeSymbol (readonly)

Returns socket type (e.g. :REQ, :PAIR).

Returns:

  • (Symbol)

    socket type (e.g. :REQ, :PAIR)



33
34
35
# File 'lib/omq/engine.rb', line 33

def socket_type
  @socket_type
end

#verbose_monitorBoolean

Returns when true, every monitor event is also printed to stderr for debugging. Set via Socket#monitor.

Returns:

  • (Boolean)

    when true, every monitor event is also printed to stderr for debugging. Set via Socket#monitor.



70
71
72
# File 'lib/omq/engine.rb', line 70

def verbose_monitor
  @verbose_monitor
end

Instance Method Details

#all_peers_goneObject



99
# File 'lib/omq/engine.rb', line 99

def all_peers_gone    = @lifecycle.all_peers_gone

#barrierObject



101
# File 'lib/omq/engine.rb', line 101

def barrier           = @lifecycle.barrier

#bind(endpoint, parent: nil, **opts) ⇒ URI::Generic

Binds to an endpoint.

Parameters:

  • endpoint (String)

    e.g. “tcp://127.0.0.1:5555”, “inproc://foo”

Returns:

  • (URI::Generic)

    resolved endpoint URI (with auto-selected port for “tcp://host:0”)

Raises:

  • (ArgumentError)

    on unsupported transport



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/omq/engine.rb', line 192

def bind(endpoint, parent: nil, **opts)
  OMQ.freeze_for_ractors!
  capture_parent_task(parent: parent)
  transport = transport_for(endpoint)
  listener  = transport.listener(endpoint, self, **opts)

  start_accept_loops(listener)

  @listeners[listener.endpoint] = listener
  emit_monitor_event(:listening, endpoint: listener.endpoint)
  URI.parse(listener.endpoint)
rescue => error
  emit_monitor_event(:bind_failed, endpoint: endpoint, detail: { error: error })
  raise
end

#build_fatal_error(error) ⇒ Object

Constructs a SocketDeadError whose cause is error. Uses the raise-in-rescue idiom because Ruby only sets cause on an exception when it is raised from inside a rescue block – works regardless of the original caller’s $! state.



508
509
510
511
512
513
514
515
516
# File 'lib/omq/engine.rb', line 508

def build_fatal_error(error)
  raise error
rescue
  begin
    raise SocketDeadError, "#{@socket_type} socket killed: #{error.message}"
  rescue SocketDeadError => wrapped
    wrapped
  end
end

#capture_parent_task(parent: nil) ⇒ Object

Captures the socket’s task tree root and starts the socket-level maintenance task. If parent is given, it is used as the parent for every task spawned under this socket (connection supervisors, reconnect loops, maintenance, monitor). Otherwise the current Async task (or the shared Reactor root, for non-Async callers) is captured automatically.

Idempotent: first call wins. Subsequent calls (including from later bind/connect invocations) with a different parent are silently ignored.

Parameters:

  • parent (#async, nil) (defaults to: nil)

    optional Async parent



532
533
534
535
536
537
538
# File 'lib/omq/engine.rb', line 532

def capture_parent_task(parent: nil)
  task = @lifecycle.capture_parent_task(parent: parent, linger: @options.linger)

  return unless task

  Maintenance.start(@lifecycle.barrier, @options.mechanism)
end

#closevoid

This method returns an undefined value.

Closes all connections and listeners gracefully. Drains pending sends up to linger seconds, then cascades teardown through the socket-level OMQ::Engine::SocketLifecycle#barrier — every per-connection barrier is stopped as a side effect, cancelling every pump.



385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
# File 'lib/omq/engine.rb', line 385

def close
  return unless @lifecycle.open?

  @lifecycle.start_closing!
  stop_listeners unless @connections.empty?

  if @options.linger.nil? || @options.linger > 0
    drain_send_queues(@options.linger)
  end

  @lifecycle.finish_closing!

  if @lifecycle.on_io_thread
    Reactor.untrack_linger(@options.linger)
  end

  stop_listeners
  tear_down_barrier
  emit_monitor_event(:closed)
  close_monitor_queue
end

#closed?Boolean

Returns:

  • (Boolean)


102
# File 'lib/omq/engine.rb', line 102

def closed?           = @lifecycle.closed?

#connect(endpoint, parent: nil, **opts) ⇒ URI::Generic

Connects to an endpoint.

Parameters:

  • endpoint (String)

Returns:

  • (URI::Generic)

    parsed endpoint URI



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/omq/engine.rb', line 214

def connect(endpoint, parent: nil, **opts)
  OMQ.freeze_for_ractors!
  capture_parent_task(parent: parent)
  validate_endpoint!(endpoint)

  if endpoint.start_with?("inproc://")
    # Inproc connect is synchronous and instant — no Dialer
    transport = transport_for(endpoint)
    transport.connect(endpoint, self, **opts)
    @dialers[endpoint] = :inproc  # sentinel for reconnect intent
  else
    transport = transport_for(endpoint)
    @dialers[endpoint] = transport.dialer(endpoint, self, **opts)
    emit_monitor_event(:connect_delayed, endpoint: endpoint)
    schedule_reconnect(endpoint, delay: 0)
  end

  URI.parse(endpoint)
end

#connection_lost(connection) ⇒ void

This method returns an undefined value.

Called when a connection is lost.

Parameters:

  • connection (Protocol::ZMTP::Connection)


352
353
354
# File 'lib/omq/engine.rb', line 352

def connection_lost(connection)
  @connections[connection]&.lost!
end

#connection_ready(pipe, endpoint: nil) ⇒ void

This method returns an undefined value.

Called by inproc transport with a pre-validated Pipe. Skips ZMTP handshake — just registers with routing strategy.

Parameters:



290
291
292
# File 'lib/omq/engine.rb', line 290

def connection_ready(pipe, endpoint: nil)
  ConnectionLifecycle.new(self, endpoint: endpoint).ready_direct!(pipe)
end

#dequeue_recvArray<String>

Dequeues the next received message. Blocks until available.

Returns:

  • (Array<String>)

    message parts

Raises:

  • if a background pump task crashed



300
301
302
303
304
305
306
307
# File 'lib/omq/engine.rb', line 300

def dequeue_recv
  raise @fatal_error if @fatal_error

  msg = routing.dequeue_recv
  raise @fatal_error if msg.nil? && @fatal_error

  msg
end

#dequeue_recv_sentinelObject

Pushes a nil sentinel into the recv queue, unblocking a pending #dequeue_recv with a nil return value.



313
314
315
# File 'lib/omq/engine.rb', line 313

def dequeue_recv_sentinel
  routing.unblock_recv
end

#disconnect(endpoint) ⇒ void

This method returns an undefined value.

Disconnects from an endpoint. Closes connections to that endpoint and stops auto-reconnection for it.

Parameters:

  • endpoint (String)


241
242
243
244
# File 'lib/omq/engine.rb', line 241

def disconnect(endpoint)
  @dialers.delete(endpoint)
  close_connections_at(endpoint)
end

#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ void

This method returns an undefined value.

Emits a lifecycle event to the monitor queue, if one is attached.

Parameters:

  • type (Symbol)

    event type (e.g. :listening, :connected, :disconnected)

  • endpoint (String, nil) (defaults to: nil)

    the endpoint involved

  • detail (Hash, nil) (defaults to: nil)

    extra context



548
549
550
551
552
553
554
555
# File 'lib/omq/engine.rb', line 548

def emit_monitor_event(type, endpoint: nil, detail: nil)
  return unless @monitor_queue

  event = MonitorEvent.new type: type, endpoint: endpoint, detail: detail

  @monitor_queue << event
rescue Async::Stop, ClosedQueueError
end

#emit_verbose_monitor_event(type, **detail) ⇒ void

This method returns an undefined value.

Emits a verbose-only monitor event (e.g. message traces). Only emitted when Socket#monitor was called with verbose: true. Uses **detail to avoid Hash allocation when verbose is off.

Parameters:

  • type (Symbol)

    event type (e.g. :message_sent, :message_received)

  • detail (Hash)

    extra context forwarded as keyword args



566
567
568
569
# File 'lib/omq/engine.rb', line 566

def emit_verbose_monitor_event(type, **detail)
  return unless @verbose_monitor
  emit_monitor_event(type, detail: detail)
end

#emit_verbose_msg_received(conn, parts) ⇒ Object

Emits a :message_received verbose event and enriches it with the on-wire (pre-decompression) byte size if conn exposes last_wire_size_in.



591
592
593
594
595
596
597
598
599
600
601
# File 'lib/omq/engine.rb', line 591

def emit_verbose_msg_received(conn, parts)
  return unless @verbose_monitor

  detail = { parts: parts }

  if conn.respond_to? :last_wire_size_in
    detail[:wire_size] = conn.last_wire_size_in
  end

  emit_monitor_event :message_received, detail: detail
end

#emit_verbose_msg_sent(conn, parts) ⇒ Object

Emits a :message_sent verbose event and enriches it with the on-wire (post-compression) byte size if conn exposes last_wire_size_out (installed by ZMTP-Zstd etc.).



575
576
577
578
579
580
581
582
583
584
585
# File 'lib/omq/engine.rb', line 575

def emit_verbose_msg_sent(conn, parts)
  return unless @verbose_monitor

  detail = { parts: parts }

  if conn.respond_to? :last_wire_size_out
    detail[:wire_size] = conn.last_wire_size_out
  end

  emit_monitor_event :message_sent, detail: detail
end

#enqueue_send(parts) ⇒ void

This method returns an undefined value.

Enqueues a message for sending. Blocks at HWM.

Parameters:

  • parts (Array<String>)

Raises:

  • if a background pump task crashed



324
325
326
327
# File 'lib/omq/engine.rb', line 324

def enqueue_send(parts)
  raise @fatal_error if @fatal_error
  routing.enqueue(parts)
end

#handle_accepted(io, endpoint: nil) ⇒ void

This method returns an undefined value.

Called by a transport when an incoming connection is accepted.

Parameters:

  • io (#read, #write, #close)
  • endpoint (String, nil) (defaults to: nil)

    the endpoint this was accepted on



267
268
269
270
# File 'lib/omq/engine.rb', line 267

def handle_accepted(io, endpoint: nil)
  emit_monitor_event(:accepted, endpoint: endpoint)
  spawn_connection(io, as_server: true, endpoint: endpoint)
end

#handle_connected(io, endpoint: nil) ⇒ void

This method returns an undefined value.

Called by a transport when an outgoing connection is established.

Parameters:

  • io (#read, #write, #close)


278
279
280
281
# File 'lib/omq/engine.rb', line 278

def handle_connected(io, endpoint: nil)
  emit_monitor_event(:connected, endpoint: endpoint)
  spawn_connection(io, as_server: false, endpoint: endpoint)
end

#maybe_reconnect(endpoint) ⇒ Object

Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still dialed.



368
369
370
371
372
373
374
375
# File 'lib/omq/engine.rb', line 368

def maybe_reconnect(endpoint)
  return unless endpoint && @dialers.key?(endpoint)
  return unless @lifecycle.open? && @lifecycle.reconnect_enabled

  dialer = @dialers[endpoint]

  Reconnect.schedule(dialer, @options, @lifecycle.parent_task, self)
end

#maybe_resolve_all_peers_goneObject

Resolves ‘all_peers_gone` if we had peers and now have none. Called by ConnectionLifecycle during teardown.



360
361
362
# File 'lib/omq/engine.rb', line 360

def maybe_resolve_all_peers_gone
  @lifecycle.maybe_resolve_all_peers_gone(@connections)
end

#on_io_thread?Boolean

Returns:

  • (Boolean)


103
# File 'lib/omq/engine.rb', line 103

def on_io_thread?     = @lifecycle.on_io_thread

#parent_taskObject



100
# File 'lib/omq/engine.rb', line 100

def parent_task       = @lifecycle.parent_task

#peer_connectedObject

Delegated to SocketLifecycle.



98
# File 'lib/omq/engine.rb', line 98

def peer_connected    = @lifecycle.peer_connected

#reconnect_enabled=(value) ⇒ Object

Enables or disables auto-reconnect for dropped connections. Delegated to SocketLifecycle. Close paths flip this to false so a lost connection doesn’t schedule a new retry after linger.

Parameters:

  • value (Boolean)


112
113
114
# File 'lib/omq/engine.rb', line 112

def reconnect_enabled=(value)
  @lifecycle.reconnect_enabled = value
end

#record_disconnect_reason(conn, error) ⇒ Object

Records the disconnect reason on the ConnectionLifecycle for conn, if any. Called by the recv pump rescue so the upcoming ‘:disconnected` monitor event carries an error reason, without exposing the internal connection map.



153
154
155
# File 'lib/omq/engine.rb', line 153

def record_disconnect_reason(conn, error)
  @connections[conn]&.record_disconnect_reason(error)
end

#routingRouting

Returns routing strategy (created lazily on first access).

Returns:

  • (Routing)

    routing strategy (created lazily on first access)



75
76
77
# File 'lib/omq/engine.rb', line 75

def routing
  @routing ||= Routing.for(@socket_type).new(self)
end

#signal_fatal_error(error) ⇒ Object

Wraps an unexpected pump error as SocketDeadError and unblocks any callers waiting on the recv queue. The original error is preserved as #cause so callers can surface the real reason.

Parameters:

  • error (Exception)


495
496
497
498
499
500
501
# File 'lib/omq/engine.rb', line 495

def signal_fatal_error(error)
  return unless @lifecycle.open?

  @fatal_error = build_fatal_error(error)
  routing.unblock_recv rescue nil
  @lifecycle.peer_connected.resolve(nil) rescue nil
end

#spawn_conn_pump_task(conn, annotation:, &block) ⇒ Object

Spawns a per-connection pump task on the connection’s own lifecycle barrier. When any pump on the barrier exits (e.g. the send pump sees EPIPE and calls #connection_lost), OMQ::Engine::ConnectionLifecycle#tear_down! calls ‘barrier.stop` which cancels every sibling pump for that connection — so a dead peer can no longer leave orphan send pumps blocked on `dequeue` waiting for messages that will never be written.

Parameters:



467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/omq/engine.rb', line 467

def spawn_conn_pump_task(conn, annotation:, &block)
  lifecycle = @connections[conn]

  unless lifecycle
    return spawn_pump_task(annotation: annotation, &block)
  end

  lifecycle.barrier.async(transient: true, annotation: annotation) do
    yield
  rescue Async::Stop, Async::Cancel
    # normal shutdown / sibling tore us down
  rescue Protocol::ZMTP::Error, *CONNECTION_LOST => error
    # expected disconnect — stash reason for the :disconnected
    # monitor event, then let the lifecycle reconnect as usual
    lifecycle.record_disconnect_reason(error)
  rescue => error
    signal_fatal_error(error)
  end
end

#spawn_inproc_retry(endpoint) {|interval| ... } ⇒ Object

Spawns an inproc reconnect retry task under the socket’s parent task.

Parameters:

  • endpoint (String)

Yields:

  • (interval)

    the retry loop body



174
175
176
177
178
179
180
181
182
183
# File 'lib/omq/engine.rb', line 174

def spawn_inproc_retry(endpoint)
  ri  = @options.reconnect_interval
  ivl = ri.is_a?(Range) ? ri.begin : ri
  ann = "inproc reconnect #{endpoint}"

  @lifecycle.barrier.async(transient: true, annotation: ann) do
    yield ivl
  rescue Async::Stop, Async::Cancel
  end
end

#spawn_pump_task(annotation:, parent: Async::Task.current) { ... } ⇒ Async::Task

Spawns a transient pump task with error propagation.

Unexpected exceptions are caught and forwarded to #signal_fatal_error so blocked callers (send/recv) see the real error instead of deadlocking.

Parameters:

  • annotation (String)

    task annotation for debugging

  • parent (Async::Task, Async::Barrier) (defaults to: Async::Task.current)

    parent for the spawned task. Defaults to the current task. Routing strategies that own loose pump tasks (Radio, Channel) pass their own Async::Barrier so a single barrier.stop tears the lot down at strategy shutdown without per-task bookkeeping.

Yields:

  • the pump loop body

Returns:

  • (Async::Task)


445
446
447
448
449
450
451
452
453
# File 'lib/omq/engine.rb', line 445

def spawn_pump_task(annotation:, parent: Async::Task.current, &block)
  parent.async(transient: true, annotation: annotation) do
    yield
  rescue Async::Stop, Async::Cancel, Protocol::ZMTP::Error, *CONNECTION_LOST
    # normal shutdown / expected disconnect
  rescue => error
    signal_fatal_error(error)
  end
end

#start_recv_pump(conn, recv_queue) {|msg| ... } ⇒ Async::Task?

Starts a recv pump for a connection, or wires the inproc fast path.

Parameters:

Yields:

  • (msg)

    optional per-message transform

Returns:

  • (Async::Task, nil)


337
338
339
340
341
342
343
344
# File 'lib/omq/engine.rb', line 337

def start_recv_pump(conn, recv_queue, &transform)
  # Spawn on the connection's lifecycle barrier so the recv pump is
  # torn down together with the rest of its sibling per-connection
  # pumps when the connection is lost.
  parent = @connections[conn]&.barrier || @lifecycle.barrier

  RecvPump.start(parent, conn, recv_queue, self, transform)
end

#stopvoid

This method returns an undefined value.

Immediate hard stop: skips the linger drain and cascades teardown through the socket-level barrier. Intended for crash-path cleanup where #close‘s drain is either unsafe or undesired.



414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/omq/engine.rb', line 414

def stop
  return unless @lifecycle.alive?

  @lifecycle.force_close!

  if @lifecycle.on_io_thread
    Reactor.untrack_linger(@options.linger)
  end

  stop_listeners
  tear_down_barrier
  emit_monitor_event(:closed)
  close_monitor_queue
end

#subscribe(prefix) ⇒ Object

Subscribes to a topic prefix on SUB/XSUB sockets. Delegates to the routing strategy so callers don’t have to chain through ‘engine.routing`.

Parameters:

  • prefix (String)


132
133
134
# File 'lib/omq/engine.rb', line 132

def subscribe(prefix)
  routing.subscribe(prefix)
end

#subscriber_joinedObject

Delegated to the routing strategy. Forwards the PUB/XPUB/SUB/XSUB subscription surface so callers don’t have to chain through ‘engine.routing`.



121
122
123
# File 'lib/omq/engine.rb', line 121

def subscriber_joined
  routing.subscriber_joined
end

#transport_for(endpoint) ⇒ Module

Looks up the transport module for an endpoint URI.

Parameters:

  • endpoint (String)

    endpoint URI (e.g. “tcp://…”, “inproc://…”)

Returns:

  • (Module)

    the transport module

Raises:

  • (ArgumentError)

    if the scheme is not registered



610
611
612
613
614
615
616
617
618
619
# File 'lib/omq/engine.rb', line 610

def transport_for(endpoint)
  scheme    = endpoint[/\A([^:]+):\/\//, 1]
  transport = self.class.transports[scheme]

  unless transport
    raise ArgumentError, "unsupported transport: #{endpoint}"
  end

  transport
end

#transport_object_for(endpoint) ⇒ Dialer, ...

Returns the transport object (Dialer or Listener) for an endpoint. Used by OMQ::Engine::ConnectionLifecycle#ready! to call #wrap_connection.

Parameters:

  • endpoint (String)

Returns:

  • (Dialer, Listener, nil)


164
165
166
# File 'lib/omq/engine.rb', line 164

def transport_object_for(endpoint)
  @dialers[endpoint] || @listeners[endpoint]
end

#unbind(endpoint) ⇒ void

This method returns an undefined value.

Unbinds from an endpoint. Stops the listener and closes all connections that were accepted on it.

Parameters:

  • endpoint (String)


253
254
255
256
257
258
# File 'lib/omq/engine.rb', line 253

def unbind(endpoint)
  listener = @listeners.delete(endpoint) or return

  listener.stop
  close_connections_at(endpoint)
end

#unsubscribe(prefix) ⇒ Object

Unsubscribes from a topic prefix on SUB/XSUB sockets. Delegates to the routing strategy so callers don’t have to chain through ‘engine.routing`.

Parameters:

  • prefix (String)


143
144
145
# File 'lib/omq/engine.rb', line 143

def unsubscribe(prefix)
  routing.unsubscribe(prefix)
end