Class: OMQ::Engine
- Inherits:
-
Object
- Object
- OMQ::Engine
- Defined in:
- lib/omq/engine.rb,
lib/omq/engine/heartbeat.rb,
lib/omq/engine/reconnect.rb,
lib/omq/engine/recv_pump.rb,
lib/omq/engine/maintenance.rb,
lib/omq/engine/socket_lifecycle.rb,
lib/omq/engine/connection_lifecycle.rb
Overview
Per-socket orchestrator.
Manages connections, transports, and the routing strategy for one OMQ::Socket instance. Each socket type creates one Engine.
Defined Under Namespace
Modules: Heartbeat, Maintenance Classes: ConnectionLifecycle, Reconnect, RecvPump, SocketLifecycle
Class Attribute Summary collapse
-
.transports ⇒ Hash{String => Module}
readonly
Registered transports.
Instance Attribute Summary collapse
-
#connection_wrapper ⇒ Object
Optional proc that wraps new connections (e.g. for serialization).
- #connections ⇒ Hash{Connection => ConnectionLifecycle}, ... readonly
-
#last_endpoint ⇒ String?
readonly
Last bound endpoint.
-
#last_tcp_port ⇒ Integer?
readonly
Last auto-selected TCP port.
- #lifecycle ⇒ Hash{Connection => ConnectionLifecycle}, ... readonly
-
#monitor_queue ⇒ Object
writeonly
Sets the attribute monitor_queue.
-
#options ⇒ Options
readonly
Socket options.
-
#socket_type ⇒ Symbol
readonly
Socket type (e.g. :REQ, :PAIR).
- #tasks ⇒ Hash{Connection => ConnectionLifecycle}, ... readonly
-
#verbose_monitor ⇒ Boolean
When true, every monitor event is also printed to stderr for debugging.
Instance Method Summary collapse
- #all_peers_gone ⇒ Object
- #barrier ⇒ Object
-
#bind(endpoint, parent: nil) ⇒ void
Binds to an endpoint.
-
#build_fatal_error(error) ⇒ Object
Constructs a SocketDeadError whose
causeiserror. -
#capture_parent_task(parent: nil) ⇒ Object
Captures the socket’s task tree root and starts the socket-level maintenance task.
-
#close ⇒ void
Closes all connections and listeners gracefully.
- #closed? ⇒ Boolean
-
#connect(endpoint, parent: nil) ⇒ void
Connects to an endpoint.
-
#connection_lost(connection) ⇒ void
Called when a connection is lost.
-
#connection_ready(pipe, endpoint: nil) ⇒ void
Called by inproc transport with a pre-validated DirectPipe.
-
#dequeue_recv ⇒ Array<String>
Dequeues the next received message.
-
#dequeue_recv_sentinel ⇒ Object
Pushes a nil sentinel into the recv queue, unblocking a pending #dequeue_recv with a nil return value.
-
#disconnect(endpoint) ⇒ void
Disconnects from an endpoint.
-
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ void
Emits a lifecycle event to the monitor queue, if one is attached.
-
#emit_verbose_monitor_event(type, **detail) ⇒ void
Emits a verbose-only monitor event (e.g. message traces).
-
#emit_verbose_msg_received(conn, parts) ⇒ Object
Emits a :message_received verbose event and enriches it with the on-wire (pre-decompression) byte size if
connexposeslast_wire_size_in. -
#emit_verbose_msg_sent(conn, parts) ⇒ Object
Emits a :message_sent verbose event and enriches it with the on-wire (post-compression) byte size if
connexposeslast_wire_size_out(installed by ZMTP-Zstd etc.). -
#enqueue_send(parts) ⇒ void
Enqueues a message for sending.
-
#handle_accepted(io, endpoint: nil) ⇒ void
Called by a transport when an incoming connection is accepted.
-
#handle_connected(io, endpoint: nil) ⇒ void
Called by a transport when an outgoing connection is established.
-
#initialize(socket_type, options) ⇒ Engine
constructor
A new instance of Engine.
-
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for
endpointif auto-reconnect is enabled and the endpoint is still dialed. - #parent_task ⇒ Object
-
#peer_connected ⇒ Object
Delegated to SocketLifecycle.
- #reconnect_enabled=(value) ⇒ Object
-
#resolve_all_peers_gone_if_empty ⇒ Object
Resolves ‘all_peers_gone` if we had peers and now have none.
-
#routing ⇒ Routing
Routing strategy (created lazily on first access).
-
#signal_fatal_error(error) ⇒ Object
Wraps an unexpected pump error as SocketDeadError and unblocks any callers waiting on the recv queue.
-
#spawn_conn_pump_task(conn, annotation:, &block) ⇒ Object
Spawns a per-connection pump task on the connection’s own lifecycle barrier.
-
#spawn_inproc_retry(endpoint) {|interval| ... } ⇒ Object
Spawns an inproc reconnect retry task under the socket’s parent task.
-
#spawn_pump_task(annotation:) { ... } ⇒ Async::Task
Spawns a transient pump task with error propagation.
-
#start_recv_pump(conn, recv_queue) {|msg| ... } ⇒ Async::Task?
Starts a recv pump for a connection, or wires the inproc fast path.
-
#stop ⇒ void
Immediate hard stop: skips the linger drain and cascades teardown through the socket-level barrier.
-
#transport_for(endpoint) ⇒ Module
Looks up the transport module for an endpoint URI.
-
#unbind(endpoint) ⇒ void
Unbinds from an endpoint.
Constructor Details
#initialize(socket_type, options) ⇒ Engine
Returns a new instance of Engine.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/omq/engine.rb', line 60 def initialize(socket_type, ) @socket_type = socket_type @options = @routing = nil @connections = {} # connection => ConnectionLifecycle @dialed = Set.new # endpoints we called connect() on (reconnect intent) @listeners = [] @tasks = [] @lifecycle = SocketLifecycle.new @last_endpoint = nil @last_tcp_port = nil @fatal_error = nil @monitor_queue = nil @verbose_monitor = false end |
Class Attribute Details
.transports ⇒ Hash{String => Module} (readonly)
Returns registered transports.
26 27 28 |
# File 'lib/omq/engine.rb', line 26 def transports @transports end |
Instance Attribute Details
#connection_wrapper ⇒ Object
Optional proc that wraps new connections (e.g. for serialization). Called with the raw connection; must return the (possibly wrapped) connection.
109 110 111 |
# File 'lib/omq/engine.rb', line 109 def connection_wrapper @connection_wrapper end |
#connections ⇒ Hash{Connection => ConnectionLifecycle}, ... (readonly)
81 82 83 |
# File 'lib/omq/engine.rb', line 81 def connections @connections end |
#last_endpoint ⇒ String? (readonly)
Returns last bound endpoint.
49 50 51 |
# File 'lib/omq/engine.rb', line 49 def last_endpoint @last_endpoint end |
#last_tcp_port ⇒ Integer? (readonly)
Returns last auto-selected TCP port.
54 55 56 |
# File 'lib/omq/engine.rb', line 54 def last_tcp_port @last_tcp_port end |
#lifecycle ⇒ Hash{Connection => ConnectionLifecycle}, ... (readonly)
81 82 83 |
# File 'lib/omq/engine.rb', line 81 def lifecycle @lifecycle end |
#monitor_queue=(value) ⇒ Object (writeonly)
Sets the attribute monitor_queue
87 88 89 |
# File 'lib/omq/engine.rb', line 87 def monitor_queue=(value) @monitor_queue = value end |
#options ⇒ Options (readonly)
Returns socket options.
37 38 39 |
# File 'lib/omq/engine.rb', line 37 def @options end |
#socket_type ⇒ Symbol (readonly)
Returns socket type (e.g. :REQ, :PAIR).
32 33 34 |
# File 'lib/omq/engine.rb', line 32 def socket_type @socket_type end |
#tasks ⇒ Hash{Connection => ConnectionLifecycle}, ... (readonly)
81 82 83 |
# File 'lib/omq/engine.rb', line 81 def tasks @tasks end |
#verbose_monitor ⇒ Boolean
Returns when true, every monitor event is also printed to stderr for debugging. Set via Socket#monitor.
92 93 94 |
# File 'lib/omq/engine.rb', line 92 def verbose_monitor @verbose_monitor end |
Instance Method Details
#all_peers_gone ⇒ Object
97 |
# File 'lib/omq/engine.rb', line 97 def all_peers_gone = @lifecycle.all_peers_gone |
#barrier ⇒ Object
99 |
# File 'lib/omq/engine.rb', line 99 def = @lifecycle. |
#bind(endpoint, parent: nil) ⇒ void
This method returns an undefined value.
Binds to an endpoint.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/omq/engine.rb', line 135 def bind(endpoint, parent: nil) OMQ.freeze_for_ractors! capture_parent_task(parent: parent) transport = transport_for(endpoint) listener = transport.bind(endpoint, self) start_accept_loops(listener) @listeners << listener @last_endpoint = listener.endpoint @last_tcp_port = listener.respond_to?(:port) ? listener.port : nil emit_monitor_event(:listening, endpoint: listener.endpoint) rescue => error emit_monitor_event(:bind_failed, endpoint: endpoint, detail: { error: error }) raise end |
#build_fatal_error(error) ⇒ Object
Constructs a SocketDeadError whose cause is error. Uses the raise-in-rescue idiom because Ruby only sets cause on an exception when it is raised from inside a rescue block – works regardless of the original caller’s $! state.
445 446 447 448 449 450 451 452 453 |
# File 'lib/omq/engine.rb', line 445 def build_fatal_error(error) raise error rescue begin raise SocketDeadError, "#{@socket_type} socket killed: #{error.}" rescue SocketDeadError => wrapped wrapped end end |
#capture_parent_task(parent: nil) ⇒ Object
Captures the socket’s task tree root and starts the socket-level maintenance task. If parent is given, it is used as the parent for every task spawned under this socket (connection supervisors, reconnect loops, maintenance, monitor). Otherwise the current Async task (or the shared Reactor root, for non-Async callers) is captured automatically.
Idempotent: first call wins. Subsequent calls (including from later bind/connect invocations) with a different parent are silently ignored.
469 470 471 472 473 474 475 |
# File 'lib/omq/engine.rb', line 469 def capture_parent_task(parent: nil) task = @lifecycle.capture_parent_task(parent: parent, linger: @options.linger) return unless task Maintenance.start(@lifecycle., @options.mechanism, @tasks) end |
#close ⇒ void
This method returns an undefined value.
Closes all connections and listeners gracefully. Drains pending sends up to linger seconds, then cascades teardown through the socket-level OMQ::Engine::SocketLifecycle#barrier — every per-connection barrier is stopped as a side effect, cancelling every pump.
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/omq/engine.rb', line 327 def close return unless @lifecycle.open? @lifecycle.start_closing! stop_listeners unless @connections.empty? if @options.linger.nil? || @options.linger > 0 drain_send_queues(@options.linger) end @lifecycle.finish_closing! if @lifecycle.on_io_thread Reactor.untrack_linger(@options.linger) end stop_listeners routing.stop rescue nil emit_monitor_event(:closed) close_monitor_queue end |
#closed? ⇒ Boolean
100 |
# File 'lib/omq/engine.rb', line 100 def closed? = @lifecycle.closed? |
#connect(endpoint, parent: nil) ⇒ void
This method returns an undefined value.
Connects to an endpoint.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/omq/engine.rb', line 158 def connect(endpoint, parent: nil) OMQ.freeze_for_ractors! capture_parent_task(parent: parent) validate_endpoint!(endpoint) @dialed.add(endpoint) if endpoint.start_with?("inproc://") # Inproc connect is synchronous and instant transport = transport_for(endpoint) transport.connect(endpoint, self) else # TCP/IPC connect in background — never blocks the caller emit_monitor_event(:connect_delayed, endpoint: endpoint) schedule_reconnect(endpoint, delay: 0) end end |
#connection_lost(connection) ⇒ void
This method returns an undefined value.
Called when a connection is lost.
297 298 299 |
# File 'lib/omq/engine.rb', line 297 def connection_lost(connection) @connections[connection]&.lost! end |
#connection_ready(pipe, endpoint: nil) ⇒ void
This method returns an undefined value.
Called by inproc transport with a pre-validated DirectPipe. Skips ZMTP handshake — just registers with routing strategy.
233 234 235 |
# File 'lib/omq/engine.rb', line 233 def connection_ready(pipe, endpoint: nil) ConnectionLifecycle.new(self, endpoint: endpoint).ready_direct!(pipe) end |
#dequeue_recv ⇒ Array<String>
Dequeues the next received message. Blocks until available.
243 244 245 246 247 248 249 250 |
# File 'lib/omq/engine.rb', line 243 def dequeue_recv raise @fatal_error if @fatal_error msg = routing.dequeue_recv raise @fatal_error if msg.nil? && @fatal_error msg end |
#dequeue_recv_sentinel ⇒ Object
Pushes a nil sentinel into the recv queue, unblocking a pending #dequeue_recv with a nil return value.
256 257 258 |
# File 'lib/omq/engine.rb', line 256 def dequeue_recv_sentinel routing.unblock_recv end |
#disconnect(endpoint) ⇒ void
This method returns an undefined value.
Disconnects from an endpoint. Closes connections to that endpoint and stops auto-reconnection for it.
182 183 184 185 |
# File 'lib/omq/engine.rb', line 182 def disconnect(endpoint) @dialed.delete(endpoint) close_connections_at(endpoint) end |
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ void
This method returns an undefined value.
Emits a lifecycle event to the monitor queue, if one is attached.
485 486 487 488 489 |
# File 'lib/omq/engine.rb', line 485 def emit_monitor_event(type, endpoint: nil, detail: nil) return unless @monitor_queue @monitor_queue.push(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail)) rescue Async::Stop, ClosedQueueError end |
#emit_verbose_monitor_event(type, **detail) ⇒ void
This method returns an undefined value.
Emits a verbose-only monitor event (e.g. message traces). Only emitted when Socket#monitor was called with verbose: true. Uses **detail to avoid Hash allocation when verbose is off.
500 501 502 503 |
# File 'lib/omq/engine.rb', line 500 def emit_verbose_monitor_event(type, **detail) return unless @verbose_monitor emit_monitor_event(type, detail: detail) end |
#emit_verbose_msg_received(conn, parts) ⇒ Object
Emits a :message_received verbose event and enriches it with the on-wire (pre-decompression) byte size if conn exposes last_wire_size_in.
520 521 522 523 524 525 526 527 528 529 530 |
# File 'lib/omq/engine.rb', line 520 def emit_verbose_msg_received(conn, parts) return unless @verbose_monitor detail = { parts: parts } if conn.respond_to?(:last_wire_size_in) detail[:wire_size] = conn.last_wire_size_in end emit_monitor_event(:message_received, detail: detail) end |
#emit_verbose_msg_sent(conn, parts) ⇒ Object
Emits a :message_sent verbose event and enriches it with the on-wire (post-compression) byte size if conn exposes last_wire_size_out (installed by ZMTP-Zstd etc.).
509 510 511 512 513 514 |
# File 'lib/omq/engine.rb', line 509 def emit_verbose_msg_sent(conn, parts) return unless @verbose_monitor detail = { parts: parts } detail[:wire_size] = conn.last_wire_size_out if conn.respond_to?(:last_wire_size_out) emit_monitor_event(:message_sent, detail: detail) end |
#enqueue_send(parts) ⇒ void
This method returns an undefined value.
Enqueues a message for sending. Blocks at HWM.
267 268 269 270 |
# File 'lib/omq/engine.rb', line 267 def enqueue_send(parts) raise @fatal_error if @fatal_error routing.enqueue(parts) end |
#handle_accepted(io, endpoint: nil) ⇒ void
This method returns an undefined value.
Called by a transport when an incoming connection is accepted.
210 211 212 213 |
# File 'lib/omq/engine.rb', line 210 def handle_accepted(io, endpoint: nil) emit_monitor_event(:accepted, endpoint: endpoint) spawn_connection(io, as_server: true, endpoint: endpoint) end |
#handle_connected(io, endpoint: nil) ⇒ void
This method returns an undefined value.
Called by a transport when an outgoing connection is established.
221 222 223 224 |
# File 'lib/omq/engine.rb', line 221 def handle_connected(io, endpoint: nil) emit_monitor_event(:connected, endpoint: endpoint) spawn_connection(io, as_server: false, endpoint: endpoint) end |
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still dialed.
313 314 315 316 317 |
# File 'lib/omq/engine.rb', line 313 def maybe_reconnect(endpoint) return unless endpoint && @dialed.include?(endpoint) return unless @lifecycle.open? && @lifecycle.reconnect_enabled Reconnect.schedule(endpoint, @options, @lifecycle.parent_task, self) end |
#parent_task ⇒ Object
98 |
# File 'lib/omq/engine.rb', line 98 def parent_task = @lifecycle.parent_task |
#peer_connected ⇒ Object
Delegated to SocketLifecycle.
96 |
# File 'lib/omq/engine.rb', line 96 def peer_connected = @lifecycle.peer_connected |
#reconnect_enabled=(value) ⇒ Object
101 102 103 |
# File 'lib/omq/engine.rb', line 101 def reconnect_enabled=(value) @lifecycle.reconnect_enabled = value end |
#resolve_all_peers_gone_if_empty ⇒ Object
Resolves ‘all_peers_gone` if we had peers and now have none. Called by ConnectionLifecycle during teardown.
305 306 307 |
# File 'lib/omq/engine.rb', line 305 def resolve_all_peers_gone_if_empty @lifecycle.resolve_all_peers_gone_if_empty(@connections) end |
#routing ⇒ Routing
Returns routing strategy (created lazily on first access).
42 43 44 |
# File 'lib/omq/engine.rb', line 42 def routing @routing ||= Routing.for(@socket_type).new(self) end |
#signal_fatal_error(error) ⇒ Object
Wraps an unexpected pump error as SocketDeadError and unblocks any callers waiting on the recv queue. The original error is preserved as #cause so callers can surface the real reason.
432 433 434 435 436 437 438 |
# File 'lib/omq/engine.rb', line 432 def signal_fatal_error(error) return unless @lifecycle.open? @fatal_error = build_fatal_error(error) routing.unblock_recv rescue nil @lifecycle.peer_connected.resolve(nil) rescue nil end |
#spawn_conn_pump_task(conn, annotation:, &block) ⇒ Object
Spawns a per-connection pump task on the connection’s own lifecycle barrier. When any pump on the barrier exits (e.g. the send pump sees EPIPE and calls #connection_lost), OMQ::Engine::ConnectionLifecycle#tear_down! calls ‘barrier.stop` which cancels every sibling pump for that connection — so a dead peer can no longer leave orphan send pumps blocked on `dequeue` waiting for messages that will never be written.
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 |
# File 'lib/omq/engine.rb', line 407 def spawn_conn_pump_task(conn, annotation:, &block) lifecycle = @connections[conn] return spawn_pump_task(annotation: annotation, &block) unless lifecycle lifecycle..async(transient: true, annotation: annotation) do yield rescue Async::Stop, Async::Cancel # normal shutdown / sibling tore us down rescue Protocol::ZMTP::Error, *CONNECTION_LOST => error # expected disconnect — stash reason for the :disconnected # monitor event, then let the lifecycle reconnect as usual lifecycle.record_disconnect_reason(error) rescue => error signal_fatal_error(error) end end |
#spawn_inproc_retry(endpoint) {|interval| ... } ⇒ Object
Spawns an inproc reconnect retry task under the socket’s parent task.
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/omq/engine.rb', line 117 def spawn_inproc_retry(endpoint) ri = @options.reconnect_interval ivl = ri.is_a?(Range) ? ri.begin : ri ann = "inproc reconnect #{endpoint}" @tasks << @lifecycle..async(transient: true, annotation: ann) do yield ivl rescue Async::Stop, Async::Cancel end end |
#spawn_pump_task(annotation:) { ... } ⇒ Async::Task
Spawns a transient pump task with error propagation.
Unexpected exceptions are caught and forwarded to #signal_fatal_error so blocked callers (send/recv) see the real error instead of deadlocking.
385 386 387 388 389 390 391 392 393 |
# File 'lib/omq/engine.rb', line 385 def spawn_pump_task(annotation:, &block) Async::Task.current.async(transient: true, annotation: annotation) do yield rescue Async::Stop, Async::Cancel, Protocol::ZMTP::Error, *CONNECTION_LOST # normal shutdown / expected disconnect rescue => error signal_fatal_error(error) end end |
#start_recv_pump(conn, recv_queue) {|msg| ... } ⇒ Async::Task?
Starts a recv pump for a connection, or wires the inproc fast path.
280 281 282 283 284 285 286 287 288 289 |
# File 'lib/omq/engine.rb', line 280 def start_recv_pump(conn, recv_queue, &transform) # Spawn on the connection's lifecycle barrier so the recv pump is # torn down together with the rest of its sibling per-connection # pumps when the connection is lost. parent = @connections[conn]&. || @lifecycle. task = RecvPump.start(parent, conn, recv_queue, self, transform) @tasks << task if task task end |
#stop ⇒ void
This method returns an undefined value.
Immediate hard stop: skips the linger drain and cascades teardown through the socket-level barrier. Intended for crash-path cleanup where #close‘s drain is either unsafe or undesired.
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/omq/engine.rb', line 357 def stop return unless @lifecycle.alive? @lifecycle.start_closing! if @lifecycle.open? @lifecycle.finish_closing! if @lifecycle.on_io_thread Reactor.untrack_linger(@options.linger) end stop_listeners routing.stop rescue nil emit_monitor_event(:closed) close_monitor_queue end |
#transport_for(endpoint) ⇒ Module
Looks up the transport module for an endpoint URI.
539 540 541 542 543 544 545 546 547 548 |
# File 'lib/omq/engine.rb', line 539 def transport_for(endpoint) scheme = endpoint[/\A([^:]+):\/\//, 1] transport = self.class.transports[scheme] unless transport raise ArgumentError, "unsupported transport: #{endpoint}" end transport end |
#unbind(endpoint) ⇒ void
This method returns an undefined value.
Unbinds from an endpoint. Stops the listener and closes all connections that were accepted on it.
194 195 196 197 198 199 200 201 |
# File 'lib/omq/engine.rb', line 194 def unbind(endpoint) listener = @listeners.find { |l| l.endpoint == endpoint } return unless listener listener.stop @listeners.delete(listener) close_connections_at(endpoint) end |