Class: OMQ::Engine::ConnectionLifecycle

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/engine/connection_lifecycle.rb

Overview

Owns the full arc of one connection: handshake → ready → closed.

Scope boundary: ConnectionLifecycle tracks a single peer link (one ZMTP connection or one inproc DirectPipe). SocketLifecycle owns the socket-wide state above it — first-peer/last-peer signaling, reconnect enable flag, the parent task tree, and the open → closing → closed transitions that gate close-time drain. A socket has exactly one SocketLifecycle and zero-or-more ConnectionLifecycles beneath it.

Centralizes the ordering of side effects (monitor events, routing registration, promise resolution, reconnect scheduling) so the sequence lives in one place instead of being scattered across Engine, ConnectionSetup, and close paths.

State machine:

new ──┬── :handshaking ── :ready ── :closed
      └── :ready ── :closed            (inproc fast path)

#lost! and #close! are idempotent — the state guard ensures side effects run exactly once even if multiple pumps race to report a lost connection.

Defined Under Namespace

Classes: InvalidTransition

Constant Summary collapse

STATES =
%i[new handshaking ready closed].freeze
TRANSITIONS =
{
  new:         %i[handshaking ready closed].freeze,
  handshaking: %i[ready closed].freeze,
  ready:       %i[closed].freeze,
  closed:      [].freeze,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine, endpoint: nil, done: nil) ⇒ ConnectionLifecycle

Returns a new instance of ConnectionLifecycle.

Parameters:

  • engine (Engine)
  • endpoint (String, nil) (defaults to: nil)
  • done (Async::Promise, nil) (defaults to: nil)

    resolved when connection is lost



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/omq/engine/connection_lifecycle.rb', line 70

def initialize(engine, endpoint: nil, done: nil)
  @engine   = engine
  @endpoint = endpoint
  @done     = done
  @state    = :new
  @conn     = nil

  # Nest the per-connection barrier under the socket-level barrier
  # so every pump spawned via +@barrier.async+ is also tracked by
  # the socket barrier — {Engine#stop}/{Engine#close} cascade
  # through in one call.
  @barrier  = Async::Barrier.new(parent: engine.barrier)
end

Instance Attribute Details

#barrierAsync::Barrier (readonly)

Returns holds all per-connection pump tasks (send pump, recv pump, reaper, heartbeat). When the connection is torn down, #tear_down! calls ‘@barrier.stop` to take down every sibling task atomically — so the first pump to see a disconnect takes down all the others.

Returns:

  • (Async::Barrier)

    holds all per-connection pump tasks (send pump, recv pump, reaper, heartbeat). When the connection is torn down, #tear_down! calls ‘@barrier.stop` to take down every sibling task atomically — so the first pump to see a disconnect takes down all the others.



63
64
65
# File 'lib/omq/engine/connection_lifecycle.rb', line 63

def barrier
  @barrier
end

#connProtocol::ZMTP::Connection, ... (readonly)

Returns:



47
48
49
# File 'lib/omq/engine/connection_lifecycle.rb', line 47

def conn
  @conn
end

#endpointString? (readonly)

Returns:

  • (String, nil)


51
52
53
# File 'lib/omq/engine/connection_lifecycle.rb', line 51

def endpoint
  @endpoint
end

#stateSymbol (readonly)

Returns current state.

Returns:

  • (Symbol)

    current state



55
56
57
# File 'lib/omq/engine/connection_lifecycle.rb', line 55

def state
  @state
end

Instance Method Details

#close!Object

Transitions to :closed without scheduling a reconnect. Used by shutdown paths (Engine#close, #disconnect, #unbind). Idempotent.



155
156
157
# File 'lib/omq/engine/connection_lifecycle.rb', line 155

def close!
  tear_down!(reconnect: false)
end

#handshake!(io, as_server:) ⇒ Protocol::ZMTP::Connection

Performs the ZMTP handshake and transitions to :ready.

Parameters:

  • io (#read, #write, #close)
  • as_server (Boolean)

Returns:

  • (Protocol::ZMTP::Connection)


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/omq/engine/connection_lifecycle.rb', line 91

def handshake!(io, as_server:)
  transition!(:handshaking)
  conn = Protocol::ZMTP::Connection.new io,
    socket_type:      @engine.socket_type.to_s,
    identity:         @engine.options.identity,
    as_server:        as_server,
    mechanism:        @engine.options.mechanism&.dup,
    max_message_size: @engine.options.max_message_size

  Async::Task.current.with_timeout(handshake_timeout) do
    conn.handshake!
  end

  Heartbeat.start(@barrier, conn, @engine.options, @engine.tasks)
  ready!(conn)
  @conn
rescue Protocol::ZMTP::Error, *CONNECTION_LOST, Async::TimeoutError => error
  @engine.emit_monitor_event :handshake_failed,
    endpoint: @endpoint, detail: { error: error }

  conn&.close

  # Full tear-down with reconnect: without this, spawn_connection's
  # ensure-block close! sees :closed and skips maybe_reconnect,
  # leaving the endpoint dead. Race is exposed when a peer RSTs
  # mid-handshake (e.g. LINGER 0 close against an in-flight connect).
  tear_down!(reconnect: true)
  raise
end

#lost!(reason: nil) ⇒ Object

Transitions to :closed, running the full loss sequence: routing removal, monitor event, reconnect scheduling. Idempotent: a no-op if already :closed.



136
137
138
# File 'lib/omq/engine/connection_lifecycle.rb', line 136

def lost!(reason: nil)
  tear_down!(reconnect: true, reason: reason || @disconnect_reason)
end

#ready_direct!(pipe) ⇒ Object

Registers an already-connected inproc pipe as :ready. No handshake — inproc DirectPipe bypasses ZMTP entirely.

Parameters:



127
128
129
# File 'lib/omq/engine/connection_lifecycle.rb', line 127

def ready_direct!(pipe)
  ready!(pipe)
end

#record_disconnect_reason(error) ⇒ Object

Records the exception that took down a pump task so that the supervisor can surface it in the :disconnected monitor event. First writer wins — subsequent pumps unwinding on the same teardown don’t overwrite the original cause.



146
147
148
# File 'lib/omq/engine/connection_lifecycle.rb', line 146

def record_disconnect_reason(error)
  @disconnect_reason ||= error
end