Class: OMQ::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/engine.rb,
lib/omq/engine/heartbeat.rb,
lib/omq/engine/reconnect.rb,
lib/omq/engine/recv_pump.rb,
lib/omq/engine/maintenance.rb,
lib/omq/engine/socket_lifecycle.rb,
lib/omq/engine/connection_lifecycle.rb

Overview

Per-socket orchestrator.

Manages connections, transports, and the routing strategy for one OMQ::Socket instance. Each socket type creates one Engine.

Defined Under Namespace

Modules: Heartbeat, Maintenance Classes: ConnectionLifecycle, Reconnect, RecvPump, SocketLifecycle

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket_type, options) ⇒ Engine

Returns a new instance of Engine.

Parameters:

  • socket_type (Symbol)

    e.g. :REQ, :REP, :PAIR

  • options (Options)


60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/omq/engine.rb', line 60

def initialize(socket_type, options)
  @socket_type     = socket_type
  @options         = options
  @routing         = nil
  @connections     = {} # connection => ConnectionLifecycle
  @dialed          = Set.new # endpoints we called connect() on (reconnect intent)
  @listeners       = []
  @tasks           = []
  @lifecycle       = SocketLifecycle.new
  @last_endpoint   = nil
  @last_tcp_port   = nil
  @fatal_error     = nil
  @monitor_queue   = nil
  @verbose_monitor = false
end

Class Attribute Details

.transportsHash{String => Module} (readonly)

Returns registered transports.

Returns:

  • (Hash{String => Module})

    registered transports



26
27
28
# File 'lib/omq/engine.rb', line 26

def transports
  @transports
end

Instance Attribute Details

#connection_wrapperObject

Optional proc that wraps new connections (e.g. for serialization). Called with the raw connection; must return the (possibly wrapped) connection.



109
110
111
# File 'lib/omq/engine.rb', line 109

def connection_wrapper
  @connection_wrapper
end

#connectionsHash{Connection => ConnectionLifecycle}, ... (readonly)

Returns:

  • (Hash{Connection => ConnectionLifecycle})

    active connections

  • (Array<Async::Task>)

    background tasks (pumps, heartbeat, reconnect)

  • (SocketLifecycle)

    socket-level state + signaling



81
82
83
# File 'lib/omq/engine.rb', line 81

def connections
  @connections
end

#last_endpointString? (readonly)

Returns last bound endpoint.

Returns:

  • (String, nil)

    last bound endpoint



49
50
51
# File 'lib/omq/engine.rb', line 49

def last_endpoint
  @last_endpoint
end

#last_tcp_portInteger? (readonly)

Returns last auto-selected TCP port.

Returns:

  • (Integer, nil)

    last auto-selected TCP port



54
55
56
# File 'lib/omq/engine.rb', line 54

def last_tcp_port
  @last_tcp_port
end

#lifecycleHash{Connection => ConnectionLifecycle}, ... (readonly)

Returns:

  • (Hash{Connection => ConnectionLifecycle})

    active connections

  • (Array<Async::Task>)

    background tasks (pumps, heartbeat, reconnect)

  • (SocketLifecycle)

    socket-level state + signaling



81
82
83
# File 'lib/omq/engine.rb', line 81

def lifecycle
  @lifecycle
end

#monitor_queue=(value) ⇒ Object (writeonly)

Sets the attribute monitor_queue

Parameters:

  • value

    the value to set the attribute monitor_queue to.



87
88
89
# File 'lib/omq/engine.rb', line 87

def monitor_queue=(value)
  @monitor_queue = value
end

#optionsOptions (readonly)

Returns socket options.

Returns:



37
38
39
# File 'lib/omq/engine.rb', line 37

def options
  @options
end

#socket_typeSymbol (readonly)

Returns socket type (e.g. :REQ, :PAIR).

Returns:

  • (Symbol)

    socket type (e.g. :REQ, :PAIR)



32
33
34
# File 'lib/omq/engine.rb', line 32

def socket_type
  @socket_type
end

#tasksHash{Connection => ConnectionLifecycle}, ... (readonly)

Returns:

  • (Hash{Connection => ConnectionLifecycle})

    active connections

  • (Array<Async::Task>)

    background tasks (pumps, heartbeat, reconnect)

  • (SocketLifecycle)

    socket-level state + signaling



81
82
83
# File 'lib/omq/engine.rb', line 81

def tasks
  @tasks
end

#verbose_monitorBoolean

Returns when true, every monitor event is also printed to stderr for debugging. Set via Socket#monitor.

Returns:

  • (Boolean)

    when true, every monitor event is also printed to stderr for debugging. Set via Socket#monitor.



92
93
94
# File 'lib/omq/engine.rb', line 92

def verbose_monitor
  @verbose_monitor
end

Instance Method Details

#all_peers_goneObject



97
# File 'lib/omq/engine.rb', line 97

def all_peers_gone    = @lifecycle.all_peers_gone

#barrierObject



99
# File 'lib/omq/engine.rb', line 99

def barrier           = @lifecycle.barrier

#bind(endpoint, parent: nil) ⇒ void

This method returns an undefined value.

Binds to an endpoint.

Parameters:

  • endpoint (String)

    e.g. “tcp://127.0.0.1:5555”, “inproc://foo”

Raises:

  • (ArgumentError)

    on unsupported transport



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/omq/engine.rb', line 135

def bind(endpoint, parent: nil)
  OMQ.freeze_for_ractors!
  capture_parent_task(parent: parent)
  transport = transport_for(endpoint)
  listener  = transport.bind(endpoint, self)

  start_accept_loops(listener)

  @listeners << listener
  @last_endpoint = listener.endpoint
  @last_tcp_port = listener.respond_to?(:port) ? listener.port : nil
  emit_monitor_event(:listening, endpoint: listener.endpoint)
rescue => error
  emit_monitor_event(:bind_failed, endpoint: endpoint, detail: { error: error })
  raise
end

#build_fatal_error(error) ⇒ Object

Constructs a SocketDeadError whose cause is error. Uses the raise-in-rescue idiom because Ruby only sets cause on an exception when it is raised from inside a rescue block – works regardless of the original caller’s $! state.



445
446
447
448
449
450
451
452
453
# File 'lib/omq/engine.rb', line 445

def build_fatal_error(error)
  raise error
rescue
  begin
    raise SocketDeadError, "#{@socket_type} socket killed: #{error.message}"
  rescue SocketDeadError => wrapped
    wrapped
  end
end

#capture_parent_task(parent: nil) ⇒ Object

Captures the socket’s task tree root and starts the socket-level maintenance task. If parent is given, it is used as the parent for every task spawned under this socket (connection supervisors, reconnect loops, maintenance, monitor). Otherwise the current Async task (or the shared Reactor root, for non-Async callers) is captured automatically.

Idempotent: first call wins. Subsequent calls (including from later bind/connect invocations) with a different parent are silently ignored.

Parameters:

  • parent (#async, nil) (defaults to: nil)

    optional Async parent



469
470
471
472
473
474
475
# File 'lib/omq/engine.rb', line 469

def capture_parent_task(parent: nil)
  task = @lifecycle.capture_parent_task(parent: parent, linger: @options.linger)

  return unless task

  Maintenance.start(@lifecycle.barrier, @options.mechanism, @tasks)
end

#closevoid

This method returns an undefined value.

Closes all connections and listeners gracefully. Drains pending sends up to linger seconds, then cascades teardown through the socket-level OMQ::Engine::SocketLifecycle#barrier — every per-connection barrier is stopped as a side effect, cancelling every pump.



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/omq/engine.rb', line 327

def close
  return unless @lifecycle.open?

  @lifecycle.start_closing!
  stop_listeners unless @connections.empty?

  if @options.linger.nil? || @options.linger > 0
    drain_send_queues(@options.linger)
  end

  @lifecycle.finish_closing!

  if @lifecycle.on_io_thread
    Reactor.untrack_linger(@options.linger)
  end

  stop_listeners
  tear_down_barrier
  routing.stop rescue nil
  emit_monitor_event(:closed)
  close_monitor_queue
end

#closed?Boolean

Returns:

  • (Boolean)


100
# File 'lib/omq/engine.rb', line 100

def closed?           = @lifecycle.closed?

#connect(endpoint, parent: nil) ⇒ void

This method returns an undefined value.

Connects to an endpoint.

Parameters:

  • endpoint (String)


158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/omq/engine.rb', line 158

def connect(endpoint, parent: nil)
  OMQ.freeze_for_ractors!
  capture_parent_task(parent: parent)
  validate_endpoint!(endpoint)
  @dialed.add(endpoint)

  if endpoint.start_with?("inproc://")
    # Inproc connect is synchronous and instant
    transport = transport_for(endpoint)
    transport.connect(endpoint, self)
  else
    # TCP/IPC connect in background — never blocks the caller
    emit_monitor_event(:connect_delayed, endpoint: endpoint)
    schedule_reconnect(endpoint, delay: 0)
  end
end

#connection_lost(connection) ⇒ void

This method returns an undefined value.

Called when a connection is lost.

Parameters:

  • connection (Connection)


297
298
299
# File 'lib/omq/engine.rb', line 297

def connection_lost(connection)
  @connections[connection]&.lost!
end

#connection_ready(pipe, endpoint: nil) ⇒ void

This method returns an undefined value.

Called by inproc transport with a pre-validated DirectPipe. Skips ZMTP handshake — just registers with routing strategy.

Parameters:



233
234
235
# File 'lib/omq/engine.rb', line 233

def connection_ready(pipe, endpoint: nil)
  ConnectionLifecycle.new(self, endpoint: endpoint).ready_direct!(pipe)
end

#dequeue_recvArray<String>

Dequeues the next received message. Blocks until available.

Returns:

  • (Array<String>)

    message parts

Raises:

  • if a background pump task crashed



243
244
245
246
247
248
249
250
# File 'lib/omq/engine.rb', line 243

def dequeue_recv
  raise @fatal_error if @fatal_error

  msg = routing.dequeue_recv
  raise @fatal_error if msg.nil? && @fatal_error

  msg
end

#dequeue_recv_sentinelObject

Pushes a nil sentinel into the recv queue, unblocking a pending #dequeue_recv with a nil return value.



256
257
258
# File 'lib/omq/engine.rb', line 256

def dequeue_recv_sentinel
  routing.unblock_recv
end

#disconnect(endpoint) ⇒ void

This method returns an undefined value.

Disconnects from an endpoint. Closes connections to that endpoint and stops auto-reconnection for it.

Parameters:

  • endpoint (String)


182
183
184
185
# File 'lib/omq/engine.rb', line 182

def disconnect(endpoint)
  @dialed.delete(endpoint)
  close_connections_at(endpoint)
end

#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ void

This method returns an undefined value.

Emits a lifecycle event to the monitor queue, if one is attached.

Parameters:

  • type (Symbol)

    event type (e.g. :listening, :connected, :disconnected)

  • endpoint (String, nil) (defaults to: nil)

    the endpoint involved

  • detail (Hash, nil) (defaults to: nil)

    extra context



485
486
487
488
489
# File 'lib/omq/engine.rb', line 485

def emit_monitor_event(type, endpoint: nil, detail: nil)
  return unless @monitor_queue
  @monitor_queue.push(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail))
rescue Async::Stop, ClosedQueueError
end

#emit_verbose_monitor_event(type, **detail) ⇒ void

This method returns an undefined value.

Emits a verbose-only monitor event (e.g. message traces). Only emitted when Socket#monitor was called with verbose: true. Uses **detail to avoid Hash allocation when verbose is off.

Parameters:

  • type (Symbol)

    event type (e.g. :message_sent, :message_received)

  • detail (Hash)

    extra context forwarded as keyword args



500
501
502
503
# File 'lib/omq/engine.rb', line 500

def emit_verbose_monitor_event(type, **detail)
  return unless @verbose_monitor
  emit_monitor_event(type, detail: detail)
end

#emit_verbose_msg_received(conn, parts) ⇒ Object

Emits a :message_received verbose event and enriches it with the on-wire (pre-decompression) byte size if conn exposes last_wire_size_in.



520
521
522
523
524
525
526
527
528
529
530
# File 'lib/omq/engine.rb', line 520

def emit_verbose_msg_received(conn, parts)
  return unless @verbose_monitor

  detail = { parts: parts }

  if conn.respond_to?(:last_wire_size_in)
    detail[:wire_size] = conn.last_wire_size_in
  end

  emit_monitor_event(:message_received, detail: detail)
end

#emit_verbose_msg_sent(conn, parts) ⇒ Object

Emits a :message_sent verbose event and enriches it with the on-wire (post-compression) byte size if conn exposes last_wire_size_out (installed by ZMTP-Zstd etc.).



509
510
511
512
513
514
# File 'lib/omq/engine.rb', line 509

def emit_verbose_msg_sent(conn, parts)
  return unless @verbose_monitor
  detail = { parts: parts }
  detail[:wire_size] = conn.last_wire_size_out if conn.respond_to?(:last_wire_size_out)
  emit_monitor_event(:message_sent, detail: detail)
end

#enqueue_send(parts) ⇒ void

This method returns an undefined value.

Enqueues a message for sending. Blocks at HWM.

Parameters:

  • parts (Array<String>)

Raises:

  • if a background pump task crashed



267
268
269
270
# File 'lib/omq/engine.rb', line 267

def enqueue_send(parts)
  raise @fatal_error if @fatal_error
  routing.enqueue(parts)
end

#handle_accepted(io, endpoint: nil) ⇒ void

This method returns an undefined value.

Called by a transport when an incoming connection is accepted.

Parameters:

  • io (#read, #write, #close)
  • endpoint (String, nil) (defaults to: nil)

    the endpoint this was accepted on



210
211
212
213
# File 'lib/omq/engine.rb', line 210

def handle_accepted(io, endpoint: nil)
  emit_monitor_event(:accepted, endpoint: endpoint)
  spawn_connection(io, as_server: true, endpoint: endpoint)
end

#handle_connected(io, endpoint: nil) ⇒ void

This method returns an undefined value.

Called by a transport when an outgoing connection is established.

Parameters:

  • io (#read, #write, #close)


221
222
223
224
# File 'lib/omq/engine.rb', line 221

def handle_connected(io, endpoint: nil)
  emit_monitor_event(:connected, endpoint: endpoint)
  spawn_connection(io, as_server: false, endpoint: endpoint)
end

#maybe_reconnect(endpoint) ⇒ Object

Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still dialed.



313
314
315
316
317
# File 'lib/omq/engine.rb', line 313

def maybe_reconnect(endpoint)
  return unless endpoint && @dialed.include?(endpoint)
  return unless @lifecycle.open? && @lifecycle.reconnect_enabled
  Reconnect.schedule(endpoint, @options, @lifecycle.parent_task, self)
end

#parent_taskObject



98
# File 'lib/omq/engine.rb', line 98

def parent_task       = @lifecycle.parent_task

#peer_connectedObject

Delegated to SocketLifecycle.



96
# File 'lib/omq/engine.rb', line 96

def peer_connected    = @lifecycle.peer_connected

#reconnect_enabled=(value) ⇒ Object



101
102
103
# File 'lib/omq/engine.rb', line 101

def reconnect_enabled=(value)
  @lifecycle.reconnect_enabled = value
end

#resolve_all_peers_gone_if_emptyObject

Resolves ‘all_peers_gone` if we had peers and now have none. Called by ConnectionLifecycle during teardown.



305
306
307
# File 'lib/omq/engine.rb', line 305

def resolve_all_peers_gone_if_empty
  @lifecycle.resolve_all_peers_gone_if_empty(@connections)
end

#routingRouting

Returns routing strategy (created lazily on first access).

Returns:

  • (Routing)

    routing strategy (created lazily on first access)



42
43
44
# File 'lib/omq/engine.rb', line 42

def routing
  @routing ||= Routing.for(@socket_type).new(self)
end

#signal_fatal_error(error) ⇒ Object

Wraps an unexpected pump error as SocketDeadError and unblocks any callers waiting on the recv queue. The original error is preserved as #cause so callers can surface the real reason.

Parameters:

  • error (Exception)


432
433
434
435
436
437
438
# File 'lib/omq/engine.rb', line 432

def signal_fatal_error(error)
  return unless @lifecycle.open?

  @fatal_error = build_fatal_error(error)
  routing.unblock_recv rescue nil
  @lifecycle.peer_connected.resolve(nil) rescue nil
end

#spawn_conn_pump_task(conn, annotation:, &block) ⇒ Object

Spawns a per-connection pump task on the connection’s own lifecycle barrier. When any pump on the barrier exits (e.g. the send pump sees EPIPE and calls #connection_lost), OMQ::Engine::ConnectionLifecycle#tear_down! calls ‘barrier.stop` which cancels every sibling pump for that connection — so a dead peer can no longer leave orphan send pumps blocked on `dequeue` waiting for messages that will never be written.

Parameters:



407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/omq/engine.rb', line 407

def spawn_conn_pump_task(conn, annotation:, &block)
  lifecycle = @connections[conn]
  return spawn_pump_task(annotation: annotation, &block) unless lifecycle

  lifecycle.barrier.async(transient: true, annotation: annotation) do
    yield
  rescue Async::Stop, Async::Cancel
    # normal shutdown / sibling tore us down
  rescue Protocol::ZMTP::Error, *CONNECTION_LOST => error
    # expected disconnect — stash reason for the :disconnected
    # monitor event, then let the lifecycle reconnect as usual
    lifecycle.record_disconnect_reason(error)
  rescue => error
    signal_fatal_error(error)
  end
end

#spawn_inproc_retry(endpoint) {|interval| ... } ⇒ Object

Spawns an inproc reconnect retry task under the socket’s parent task.

Parameters:

  • endpoint (String)

Yields:

  • (interval)

    the retry loop body



117
118
119
120
121
122
123
124
125
126
# File 'lib/omq/engine.rb', line 117

def spawn_inproc_retry(endpoint)
  ri  = @options.reconnect_interval
  ivl = ri.is_a?(Range) ? ri.begin : ri
  ann = "inproc reconnect #{endpoint}"

  @tasks << @lifecycle.barrier.async(transient: true, annotation: ann) do
    yield ivl
  rescue Async::Stop, Async::Cancel
  end
end

#spawn_pump_task(annotation:) { ... } ⇒ Async::Task

Spawns a transient pump task with error propagation.

Unexpected exceptions are caught and forwarded to #signal_fatal_error so blocked callers (send/recv) see the real error instead of deadlocking.

Parameters:

  • annotation (String)

    task annotation for debugging

Yields:

  • the pump loop body

Returns:

  • (Async::Task)


385
386
387
388
389
390
391
392
393
# File 'lib/omq/engine.rb', line 385

def spawn_pump_task(annotation:, &block)
  Async::Task.current.async(transient: true, annotation: annotation) do
    yield
  rescue Async::Stop, Async::Cancel, Protocol::ZMTP::Error, *CONNECTION_LOST
    # normal shutdown / expected disconnect
  rescue => error
    signal_fatal_error(error)
  end
end

#start_recv_pump(conn, recv_queue) {|msg| ... } ⇒ Async::Task?

Starts a recv pump for a connection, or wires the inproc fast path.

Parameters:

Yields:

  • (msg)

    optional per-message transform

Returns:

  • (Async::Task, nil)


280
281
282
283
284
285
286
287
288
289
# File 'lib/omq/engine.rb', line 280

def start_recv_pump(conn, recv_queue, &transform)
  # Spawn on the connection's lifecycle barrier so the recv pump is
  # torn down together with the rest of its sibling per-connection
  # pumps when the connection is lost.
  parent = @connections[conn]&.barrier || @lifecycle.barrier
  task   = RecvPump.start(parent, conn, recv_queue, self, transform)

  @tasks << task if task
  task
end

#stopvoid

This method returns an undefined value.

Immediate hard stop: skips the linger drain and cascades teardown through the socket-level barrier. Intended for crash-path cleanup where #close‘s drain is either unsafe or undesired.



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/omq/engine.rb', line 357

def stop
  return unless @lifecycle.alive?

  @lifecycle.start_closing! if @lifecycle.open?
  @lifecycle.finish_closing!

  if @lifecycle.on_io_thread
    Reactor.untrack_linger(@options.linger)
  end

  stop_listeners
  tear_down_barrier
  routing.stop rescue nil
  emit_monitor_event(:closed)
  close_monitor_queue
end

#transport_for(endpoint) ⇒ Module

Looks up the transport module for an endpoint URI.

Parameters:

  • endpoint (String)

    endpoint URI (e.g. “tcp://…”, “inproc://…”)

Returns:

  • (Module)

    the transport module

Raises:

  • (ArgumentError)

    if the scheme is not registered



539
540
541
542
543
544
545
546
547
548
# File 'lib/omq/engine.rb', line 539

def transport_for(endpoint)
  scheme    = endpoint[/\A([^:]+):\/\//, 1]
  transport = self.class.transports[scheme]

  unless transport
    raise ArgumentError, "unsupported transport: #{endpoint}"
  end

  transport
end

#unbind(endpoint) ⇒ void

This method returns an undefined value.

Unbinds from an endpoint. Stops the listener and closes all connections that were accepted on it.

Parameters:

  • endpoint (String)


194
195
196
197
198
199
200
201
# File 'lib/omq/engine.rb', line 194

def unbind(endpoint)
  listener = @listeners.find { |l| l.endpoint == endpoint }
  return unless listener

  listener.stop
  @listeners.delete(listener)
  close_connections_at(endpoint)
end