Class: OMQ::Engine::ConnectionLifecycle

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/engine/connection_lifecycle.rb

Overview

Owns the full arc of one connection: handshake → ready → closed.

Scope boundary: ConnectionLifecycle tracks a single peer link (one ZMTP connection or one inproc Pipe). SocketLifecycle owns the socket-wide state above it — first-peer/last-peer signaling, reconnect enable flag, the parent task tree, and the open → closing → closed transitions that gate close-time drain. A socket has exactly one SocketLifecycle and zero-or-more ConnectionLifecycles beneath it.

Centralizes the ordering of side effects (monitor events, routing registration, promise resolution, reconnect scheduling) so the sequence lives in one place instead of being scattered across Engine, ConnectionSetup, and close paths.

State machine:

new ──┬── :handshaking ── :ready ── :closed
      └── :ready ── :closed            (inproc fast path)

#lost! and #close! are idempotent — the state guard ensures side effects run exactly once even if multiple pumps race to report a lost connection.

Defined Under Namespace

Classes: InvalidTransition

Constant Summary collapse

STATES =
%i[new handshaking ready closed].freeze
TRANSITIONS =
{
  new:         %i[handshaking ready closed].freeze,
  handshaking: %i[ready closed].freeze,
  ready:       %i[closed].freeze,
  closed:      [].freeze,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine, endpoint: nil, done: nil, transport: nil) ⇒ ConnectionLifecycle

Returns a new instance of ConnectionLifecycle.

Parameters:

  • engine (Engine)
  • endpoint (String, nil) (defaults to: nil)
  • done (Async::Promise, nil) (defaults to: nil)

    resolved when connection is lost

  • transport (Module, nil) (defaults to: nil)

    transport module that produced io; queried for connection_class so plugins (e.g. WebSocket) can substitute their own ZMTP-shaped connection class. Falls back to Protocol::ZMTP::Connection when nil or when the transport doesn’t define connection_class.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/omq/engine/connection_lifecycle.rb', line 75

def initialize(engine, endpoint: nil, done: nil, transport: nil)
  @engine    = engine
  @endpoint  = endpoint
  @done      = done
  @transport = transport
  @state     = :new
  @conn      = nil

  # Nest the per-connection barrier under the socket-level barrier
  # so every pump spawned via +@barrier.async+ is also tracked by
  # the socket barrier — {Engine#stop}/{Engine#close} cascade
  # through in one call.
  @barrier  = Async::Barrier.new(parent: engine.barrier)
end

Instance Attribute Details

#barrierAsync::Barrier (readonly)

Returns holds all per-connection pump tasks (send pump, recv pump, reaper, heartbeat). When the connection is torn down, #tear_down! calls ‘@barrier.stop` to take down every sibling task atomically — so the first pump to see a disconnect takes down all the others.

Returns:

  • (Async::Barrier)

    holds all per-connection pump tasks (send pump, recv pump, reaper, heartbeat). When the connection is torn down, #tear_down! calls ‘@barrier.stop` to take down every sibling task atomically — so the first pump to see a disconnect takes down all the others.



63
64
65
# File 'lib/omq/engine/connection_lifecycle.rb', line 63

def barrier
  @barrier
end

#connProtocol::ZMTP::Connection, ... (readonly)

Returns:



47
48
49
# File 'lib/omq/engine/connection_lifecycle.rb', line 47

def conn
  @conn
end

#endpointString? (readonly)

Returns:

  • (String, nil)


51
52
53
# File 'lib/omq/engine/connection_lifecycle.rb', line 51

def endpoint
  @endpoint
end

#stateSymbol (readonly)

Returns current state.

Returns:

  • (Symbol)

    current state



55
56
57
# File 'lib/omq/engine/connection_lifecycle.rb', line 55

def state
  @state
end

Instance Method Details

#close!Object

Transitions to :closed without scheduling a reconnect. Used by shutdown paths (Engine#close, #disconnect, #unbind). Idempotent.



162
163
164
# File 'lib/omq/engine/connection_lifecycle.rb', line 162

def close!
  tear_down!(reconnect: false)
end

#handshake!(io, as_server:) ⇒ Protocol::ZMTP::Connection

Performs the ZMTP handshake and transitions to :ready.

Parameters:

  • io (#read, #write, #close)
  • as_server (Boolean)

Returns:

  • (Protocol::ZMTP::Connection)


97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/omq/engine/connection_lifecycle.rb', line 97

def handshake!(io, as_server:)
  transition!(:handshaking)
  conn_class = @transport.respond_to?(:connection_class) ? @transport.connection_class : Protocol::ZMTP::Connection
  conn = conn_class.new io,
    socket_type:      @engine.socket_type.to_s,
    identity:         @engine.options.identity,
    as_server:        as_server,
    mechanism:        @engine.options.mechanism&.dup,
    max_message_size: @engine.options.max_message_size

  Async::Task.current.with_timeout(handshake_timeout) do
    conn.handshake!
  end

  Heartbeat.start(@barrier, conn, @engine.options)
  ready!(conn)
  @conn
rescue Protocol::ZMTP::Error, *CONNECTION_LOST, Async::TimeoutError => error
  @engine.emit_monitor_event :handshake_failed,
    endpoint: @endpoint, detail: { error: error }

  conn&.close

  # Full tear-down with reconnect: without this, spawn_connection's
  # ensure-block close! sees :closed and skips maybe_reconnect,
  # leaving the endpoint dead. Race is exposed when a peer RSTs
  # mid-handshake (e.g. LINGER 0 close against an in-flight connect).
  tear_down!(reconnect: true)
  raise
end

#lost!(reason: nil) ⇒ Object

Transitions to :closed, running the full loss sequence: routing removal, monitor event, reconnect scheduling. Idempotent: a no-op if already :closed.



143
144
145
# File 'lib/omq/engine/connection_lifecycle.rb', line 143

def lost!(reason: nil)
  tear_down!(reconnect: true, reason: reason || @disconnect_reason)
end

#ready_direct!(pipe) ⇒ Object

Registers an already-connected inproc pipe as :ready. No handshake — inproc Pipe bypasses ZMTP entirely.

Parameters:



134
135
136
# File 'lib/omq/engine/connection_lifecycle.rb', line 134

def ready_direct!(pipe)
  ready!(pipe)
end

#record_disconnect_reason(error) ⇒ Object

Records the exception that took down a pump task so that the supervisor can surface it in the :disconnected monitor event. First writer wins — subsequent pumps unwinding on the same teardown don’t overwrite the original cause.



153
154
155
# File 'lib/omq/engine/connection_lifecycle.rb', line 153

def record_disconnect_reason(error)
  @disconnect_reason ||= error
end