Class: ZZQ::Engine::ConnectionLifecycle
- Inherits:
-
Object
- Object
- ZZQ::Engine::ConnectionLifecycle
- Defined in:
- lib/zzq/engine/connection_lifecycle.rb
Overview
Owns the full arc of one connection: new → handshaking → ready →closed. Centralises side-effect ordering so routing doesn’t have to.
The MQTT CONNECT/CONNACK exchange itself is NOT performed here —it’s role-asymmetric (client writes CONNECT first, broker reads it first) and so lives in the role’s routing layer, which calls handshake_succeeded! once the exchange is done.
new → handshaking → ready → closed
#lost! / #close! are idempotent.
Defined Under Namespace
Classes: InvalidTransition
Constant Summary collapse
- STATES =
%i[new handshaking ready closed].freeze
- TRANSITIONS =
{ new: %i[handshaking ready closed].freeze, handshaking: %i[ready closed].freeze, ready: %i[closed].freeze, closed: [].freeze, }.freeze
Instance Attribute Summary collapse
-
#barrier ⇒ Object
readonly
Returns the value of attribute barrier.
-
#conn ⇒ Object
readonly
Protocol::MQTT::Connection.
-
#endpoint ⇒ Object
readonly
Returns the value of attribute endpoint.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
-
#begin_handshake!(io) ⇒ Object
Wraps the accepted/connected IO in a Protocol::MQTT::Connection and returns it.
-
#close! ⇒ Object
Deliberate close (engine shutdown, kicked by broker, …).
-
#handshake_failed!(error:) ⇒ Object
Called on handshake failure — e.g.
-
#handshake_succeeded! ⇒ Object
Called by the routing layer after CONNECT/CONNACK succeeds.
-
#initialize(engine, endpoint:) ⇒ ConnectionLifecycle
constructor
A new instance of ConnectionLifecycle.
-
#lost! ⇒ Object
Unexpected loss of an established connection.
-
#start_supervisor! ⇒ Object
Starts a supervisor task that tears down when any pump exits.
Constructor Details
#initialize(engine, endpoint:) ⇒ ConnectionLifecycle
Returns a new instance of ConnectionLifecycle.
39 40 41 42 43 44 45 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 39 def initialize(engine, endpoint:) @engine = engine @endpoint = endpoint @state = :new @conn = nil @barrier = Async::Barrier.new(parent: engine.) end |
Instance Attribute Details
#barrier ⇒ Object (readonly)
Returns the value of attribute barrier.
36 37 38 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 36 def @barrier end |
#conn ⇒ Object (readonly)
Protocol::MQTT::Connection
33 34 35 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 33 def conn @conn end |
#endpoint ⇒ Object (readonly)
Returns the value of attribute endpoint.
34 35 36 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 34 def endpoint @endpoint end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
35 36 37 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 35 def state @state end |
Instance Method Details
#begin_handshake!(io) ⇒ Object
Wraps the accepted/connected IO in a Protocol::MQTT::Connection and returns it. The handshake itself (CONNECT/CONNACK exchange) is driven by the routing layer — this is “transport ready, MQTT handshake pending”.
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 52 def begin_handshake!(io) transition!(:handshaking) mqtt = Protocol::MQTT::Connection.new( io, version: @engine..version, max_packet_size: @engine..max_packet_size, ) @conn = mqtt mqtt end |
#close! ⇒ Object
Deliberate close (engine shutdown, kicked by broker, …).
96 97 98 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 96 def close! tear_down!(reconnect: false) end |
#handshake_failed!(error:) ⇒ Object
Called on handshake failure — e.g. refused CONNACK, RST mid- CONNECT. Tears down and requests a reconnect if eligible.
82 83 84 85 86 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 82 def handshake_failed!(error:) @engine.emit_monitor_event(:handshake_failed, endpoint: @endpoint, detail: { error: error }) @conn&.close rescue nil tear_down!(reconnect: true) end |
#handshake_succeeded! ⇒ Object
Called by the routing layer after CONNECT/CONNACK succeeds.
65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 65 def handshake_succeeded! @engine.connections[@conn] = self transition!(:ready) begin @engine.routing.connection_added(@conn) if @engine.routing.respond_to?(:connection_added) rescue ConnectionRejected @engine.emit_monitor_event(:connection_rejected, endpoint: @endpoint) tear_down!(reconnect: false) raise end @engine.lifecycle.peer_connected.resolve(@conn) unless @engine.lifecycle.peer_connected.resolved? @engine.emit_monitor_event(:connected, endpoint: @endpoint) end |
#lost! ⇒ Object
Unexpected loss of an established connection.
90 91 92 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 90 def lost! tear_down!(reconnect: true) end |
#start_supervisor! ⇒ Object
Starts a supervisor task that tears down when any pump exits.
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/zzq/engine/connection_lifecycle.rb', line 102 def start_supervisor! return if @barrier.empty? @engine..async(transient: true, annotation: "zzq conn supervisor") do @barrier.wait { |task| task.wait; break } rescue Async::Stop, Async::Cancel rescue *CONNECTION_LOST ensure lost! end end |