Class: OMQ::Engine::ConnectionLifecycle
- Inherits:
-
Object
- Object
- OMQ::Engine::ConnectionLifecycle
- Defined in:
- lib/omq/engine/connection_lifecycle.rb
Overview
Owns the full arc of one connection: handshake → ready → closed.
Scope boundary: ConnectionLifecycle tracks a single peer link (one ZMTP connection or one inproc DirectPipe). SocketLifecycle owns the socket-wide state above it — first-peer/last-peer signaling, reconnect enable flag, the parent task tree, and the open → closing → closed transitions that gate close-time drain. A socket has exactly one SocketLifecycle and zero-or-more ConnectionLifecycles beneath it.
Centralizes the ordering of side effects (monitor events, routing registration, promise resolution, reconnect scheduling) so the sequence lives in one place instead of being scattered across Engine, ConnectionSetup, and close paths.
State machine:
new ──┬── :handshaking ── :ready ── :closed
└── :ready ── :closed (inproc fast path)
#lost! and #close! are idempotent — the state guard ensures side effects run exactly once even if multiple pumps race to report a lost connection.
Defined Under Namespace
Classes: InvalidTransition
Constant Summary collapse
- STATES =
%i[new handshaking ready closed].freeze
- TRANSITIONS =
{ new: %i[handshaking ready closed].freeze, handshaking: %i[ready closed].freeze, ready: %i[closed].freeze, closed: [].freeze, }.freeze
Instance Attribute Summary collapse
-
#barrier ⇒ Async::Barrier
readonly
Holds all per-connection pump tasks (send pump, recv pump, reaper, heartbeat).
- #conn ⇒ Protocol::ZMTP::Connection, ... readonly
- #endpoint ⇒ String? readonly
-
#state ⇒ Symbol
readonly
Current state.
Instance Method Summary collapse
-
#close! ⇒ Object
Transitions to :closed without scheduling a reconnect.
-
#handshake!(io, as_server:) ⇒ Protocol::ZMTP::Connection
Performs the ZMTP handshake and transitions to :ready.
-
#initialize(engine, endpoint: nil, done: nil) ⇒ ConnectionLifecycle
constructor
A new instance of ConnectionLifecycle.
-
#lost!(reason: nil) ⇒ Object
Transitions to :closed, running the full loss sequence: routing removal, monitor event, reconnect scheduling.
-
#ready_direct!(pipe) ⇒ Object
Registers an already-connected inproc pipe as :ready.
-
#record_disconnect_reason(error) ⇒ Object
Records the exception that took down a pump task so that the supervisor can surface it in the :disconnected monitor event.
Constructor Details
#initialize(engine, endpoint: nil, done: nil) ⇒ ConnectionLifecycle
Returns a new instance of ConnectionLifecycle.
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 70 def initialize(engine, endpoint: nil, done: nil) @engine = engine @endpoint = endpoint @done = done @state = :new @conn = nil # Nest the per-connection barrier under the socket-level barrier # so every pump spawned via +@barrier.async+ is also tracked by # the socket barrier — {Engine#stop}/{Engine#close} cascade # through in one call. @barrier = Async::Barrier.new(parent: engine.) end |
Instance Attribute Details
#barrier ⇒ Async::Barrier (readonly)
Returns holds all per-connection pump tasks (send pump, recv pump, reaper, heartbeat). When the connection is torn down, #tear_down! calls ‘@barrier.stop` to take down every sibling task atomically — so the first pump to see a disconnect takes down all the others.
63 64 65 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 63 def @barrier end |
#conn ⇒ Protocol::ZMTP::Connection, ... (readonly)
47 48 49 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 47 def conn @conn end |
#endpoint ⇒ String? (readonly)
51 52 53 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 51 def endpoint @endpoint end |
#state ⇒ Symbol (readonly)
Returns current state.
55 56 57 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 55 def state @state end |
Instance Method Details
#close! ⇒ Object
Transitions to :closed without scheduling a reconnect. Used by shutdown paths (Engine#close, #disconnect, #unbind). Idempotent.
155 156 157 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 155 def close! tear_down!(reconnect: false) end |
#handshake!(io, as_server:) ⇒ Protocol::ZMTP::Connection
Performs the ZMTP handshake and transitions to :ready.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 91 def handshake!(io, as_server:) transition!(:handshaking) conn = Protocol::ZMTP::Connection.new io, socket_type: @engine.socket_type.to_s, identity: @engine..identity, as_server: as_server, mechanism: @engine..mechanism&.dup, max_message_size: @engine.. Async::Task.current.with_timeout(handshake_timeout) do conn.handshake! end Heartbeat.start(@barrier, conn, @engine., @engine.tasks) ready!(conn) @conn rescue Protocol::ZMTP::Error, *CONNECTION_LOST, Async::TimeoutError => error @engine.emit_monitor_event :handshake_failed, endpoint: @endpoint, detail: { error: error } conn&.close # Full tear-down with reconnect: without this, spawn_connection's # ensure-block close! sees :closed and skips maybe_reconnect, # leaving the endpoint dead. Race is exposed when a peer RSTs # mid-handshake (e.g. LINGER 0 close against an in-flight connect). tear_down!(reconnect: true) raise end |
#lost!(reason: nil) ⇒ Object
Transitions to :closed, running the full loss sequence: routing removal, monitor event, reconnect scheduling. Idempotent: a no-op if already :closed.
136 137 138 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 136 def lost!(reason: nil) tear_down!(reconnect: true, reason: reason || @disconnect_reason) end |
#ready_direct!(pipe) ⇒ Object
Registers an already-connected inproc pipe as :ready. No handshake — inproc DirectPipe bypasses ZMTP entirely.
127 128 129 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 127 def ready_direct!(pipe) ready!(pipe) end |
#record_disconnect_reason(error) ⇒ Object
Records the exception that took down a pump task so that the supervisor can surface it in the :disconnected monitor event. First writer wins — subsequent pumps unwinding on the same teardown don’t overwrite the original cause.
146 147 148 |
# File 'lib/omq/engine/connection_lifecycle.rb', line 146 def record_disconnect_reason(error) @disconnect_reason ||= error end |