Class: ZZQ::Engine::ConnectionLifecycle

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/engine/connection_lifecycle.rb

Overview

Owns the full arc of one connection: new → handshaking → ready →closed. Centralises side-effect ordering so routing doesn’t have to.

The MQTT CONNECT/CONNACK exchange itself is NOT performed here —it’s role-asymmetric (client writes CONNECT first, broker reads it first) and so lives in the role’s routing layer, which calls handshake_succeeded! once the exchange is done.

new  handshaking  ready  closed

#lost! / #close! are idempotent.

Defined Under Namespace

Classes: InvalidTransition

Constant Summary collapse

STATES =
%i[new handshaking ready closed].freeze
TRANSITIONS =
{
  new:         %i[handshaking ready closed].freeze,
  handshaking: %i[ready closed].freeze,
  ready:       %i[closed].freeze,
  closed:      [].freeze,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine, endpoint:) ⇒ ConnectionLifecycle

Returns a new instance of ConnectionLifecycle.



39
40
41
42
43
44
45
# File 'lib/zzq/engine/connection_lifecycle.rb', line 39

def initialize(engine, endpoint:)
  @engine   = engine
  @endpoint = endpoint
  @state    = :new
  @conn     = nil
  @barrier  = Async::Barrier.new(parent: engine.barrier)
end

Instance Attribute Details

#barrierObject (readonly)

Returns the value of attribute barrier.



36
37
38
# File 'lib/zzq/engine/connection_lifecycle.rb', line 36

def barrier
  @barrier
end

#connObject (readonly)

Protocol::MQTT::Connection



33
34
35
# File 'lib/zzq/engine/connection_lifecycle.rb', line 33

def conn
  @conn
end

#endpointObject (readonly)

Returns the value of attribute endpoint.



34
35
36
# File 'lib/zzq/engine/connection_lifecycle.rb', line 34

def endpoint
  @endpoint
end

#stateObject (readonly)

Returns the value of attribute state.



35
36
37
# File 'lib/zzq/engine/connection_lifecycle.rb', line 35

def state
  @state
end

Instance Method Details

#begin_handshake!(io) ⇒ Object

Wraps the accepted/connected IO in a Protocol::MQTT::Connection and returns it. The handshake itself (CONNECT/CONNACK exchange) is driven by the routing layer — this is “transport ready, MQTT handshake pending”.



52
53
54
55
56
57
58
59
60
61
# File 'lib/zzq/engine/connection_lifecycle.rb', line 52

def begin_handshake!(io)
  transition!(:handshaking)
  mqtt = Protocol::MQTT::Connection.new(
    io,
    version: @engine.options.version,
    max_packet_size: @engine.options.max_packet_size,
  )
  @conn = mqtt
  mqtt
end

#close!Object

Deliberate close (engine shutdown, kicked by broker, …).



96
97
98
# File 'lib/zzq/engine/connection_lifecycle.rb', line 96

def close!
  tear_down!(reconnect: false)
end

#handshake_failed!(error:) ⇒ Object

Called on handshake failure — e.g. refused CONNACK, RST mid- CONNECT. Tears down and requests a reconnect if eligible.



82
83
84
85
86
# File 'lib/zzq/engine/connection_lifecycle.rb', line 82

def handshake_failed!(error:)
  @engine.emit_monitor_event(:handshake_failed, endpoint: @endpoint, detail: { error: error })
  @conn&.close rescue nil
  tear_down!(reconnect: true)
end

#handshake_succeeded!Object

Called by the routing layer after CONNECT/CONNACK succeeds.



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/zzq/engine/connection_lifecycle.rb', line 65

def handshake_succeeded!
  @engine.connections[@conn] = self
  transition!(:ready)
  begin
    @engine.routing.connection_added(@conn) if @engine.routing.respond_to?(:connection_added)
  rescue ConnectionRejected
    @engine.emit_monitor_event(:connection_rejected, endpoint: @endpoint)
    tear_down!(reconnect: false)
    raise
  end
  @engine.lifecycle.peer_connected.resolve(@conn) unless @engine.lifecycle.peer_connected.resolved?
  @engine.emit_monitor_event(:connected, endpoint: @endpoint)
end

#lost!Object

Unexpected loss of an established connection.



90
91
92
# File 'lib/zzq/engine/connection_lifecycle.rb', line 90

def lost!
  tear_down!(reconnect: true)
end

#start_supervisor!Object

Starts a supervisor task that tears down when any pump exits.



102
103
104
105
106
107
108
109
110
111
# File 'lib/zzq/engine/connection_lifecycle.rb', line 102

def start_supervisor!
  return if @barrier.empty?
  @engine.barrier.async(transient: true, annotation: "zzq conn supervisor") do
    @barrier.wait { |task| task.wait; break }
  rescue Async::Stop, Async::Cancel
  rescue *CONNECTION_LOST
  ensure
    lost!
  end
end