Class: NNQ::Engine::ConnectionLifecycle

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/engine/connection_lifecycle.rb

Overview

Owns the full arc of one connection: handshake → ready → closed.

Centralizes the ordering of side effects (routing registration, teardown) so the sequence lives in one place instead of being scattered across Engine, the accept/connect paths, and the recv/send pumps.

State machine:

new  handshaking  ready  closed

#lost! and #close! are idempotent — the state guard ensures side effects run exactly once even if multiple pumps race to report a lost connection.

Defined Under Namespace

Classes: InvalidTransition

Constant Summary collapse

STATES =
%i[new handshaking ready closed].freeze
TRANSITIONS =
{
  new:         %i[handshaking ready closed].freeze,
  handshaking: %i[ready closed].freeze,
  ready:       %i[closed].freeze,
  closed:      [].freeze,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine, endpoint:, framing:) ⇒ ConnectionLifecycle

Returns a new instance of ConnectionLifecycle.

Parameters:

  • engine (Engine)
  • endpoint (String, nil)
  • framing (Symbol)

    :tcp or :ipc



56
57
58
59
60
61
62
63
# File 'lib/nnq/engine/connection_lifecycle.rb', line 56

def initialize(engine, endpoint:, framing:)
  @engine   = engine
  @endpoint = endpoint
  @framing  = framing
  @state    = :new
  @conn     = nil
  @barrier  = Async::Barrier.new(parent: engine.barrier)
end

Instance Attribute Details

#barrierAsync::Barrier (readonly)

Returns holds all per-connection pump tasks (send pump, recv pump). When the connection is torn down, #tear_down! calls ‘@barrier.stop` to cancel every sibling task atomically.

Returns:

  • (Async::Barrier)

    holds all per-connection pump tasks (send pump, recv pump). When the connection is torn down, #tear_down! calls ‘@barrier.stop` to cancel every sibling task atomically.



50
51
52
# File 'lib/nnq/engine/connection_lifecycle.rb', line 50

def barrier
  @barrier
end

#connNNQ::Connection? (readonly)

Returns:



38
39
40
# File 'lib/nnq/engine/connection_lifecycle.rb', line 38

def conn
  @conn
end

#endpointString? (readonly)

Returns:

  • (String, nil)


41
42
43
# File 'lib/nnq/engine/connection_lifecycle.rb', line 41

def endpoint
  @endpoint
end

#stateSymbol (readonly)

Returns:

  • (Symbol)


44
45
46
# File 'lib/nnq/engine/connection_lifecycle.rb', line 44

def state
  @state
end

Instance Method Details

#close!Object

Deliberate close (engine shutdown or routing eviction). Does not trigger reconnect.



102
103
104
# File 'lib/nnq/engine/connection_lifecycle.rb', line 102

def close!
  tear_down!(reconnect: false)
end

#handshake!(io) ⇒ NNQ::Connection

Performs the SP handshake, wraps the result in NNQ::Connection, registers with the engine and routing, and transitions to :ready.

Parameters:

  • io (#read, #write, #close)

Returns:



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/nnq/engine/connection_lifecycle.rb', line 71

def handshake!(io)
  transition!(:handshaking)
  sp = Protocol::SP::Connection.new(
    io,
    protocol:         @engine.protocol,
    max_message_size: @engine.options.max_message_size,
    framing:          @framing,
  )
  Async::Task.current.with_timeout(handshake_timeout) { sp.handshake! }
  ready!(NNQ::Connection.new(sp, endpoint: @endpoint))
  @conn
rescue Protocol::SP::Error, *CONNECTION_LOST, Async::TimeoutError => error
  @engine.emit_monitor_event(:handshake_failed, endpoint: @endpoint, detail: { error: error })
  io.close rescue nil
  # Full tear-down with reconnect: without this, the endpoint
  # goes dead when a peer RSTs mid-handshake.
  tear_down!(reconnect: true)
  raise
end

#lost!Object

Unexpected loss of an established connection. Tears down and asks the engine to schedule a reconnect (if the endpoint is in the dialed set and reconnect is still enabled).



95
96
97
# File 'lib/nnq/engine/connection_lifecycle.rb', line 95

def lost!
  tear_down!(reconnect: true)
end

#start_supervisor!Object

Starts a supervisor for this connection. Must be called after all per-connection pumps (recv loop, send pump) have been spawned on the connection barrier. The supervisor blocks until the first pump exits, then runs tear_down! via lost!.

Called by Engine#handle_accepted / Engine#handle_connected after spawning the recv loop — routing’s connection_added may have already spawned send pumps during ready!, so the barrier is guaranteed non-empty by then.



116
117
118
# File 'lib/nnq/engine/connection_lifecycle.rb', line 116

def start_supervisor!
  start_supervisor unless @barrier.empty?
end