Class: OMQ::Engine::SocketLifecycle
- Inherits:
-
Object
- Object
- OMQ::Engine::SocketLifecycle
- Defined in:
- lib/omq/engine/socket_lifecycle.rb
Overview
Owns the socket-level state: ‘:new → :open → :closing → :closed`, the first-peer / last-peer signaling promises, the reconnect flag, and the captured parent task for the socket’s task tree.
Scope boundary: SocketLifecycle is per-socket and outlives every individual peer link. ConnectionLifecycle is per-connection and handles one handshake → ready → closed arc beneath it. Roughly: SocketLifecycle answers “is this socket open and do we have any peers?”, ConnectionLifecycle answers “is this specific peer link ready / lost?”.
Engine delegates state queries here and uses it to coordinate the ordering of close-time side effects. This consolidates six ivars (‘@state`, `@peer_connected`, `@all_peers_gone`, `@reconnect_enabled`, `@parent_task`, `@on_io_thread`) into one cohesive object with explicit transitions.
Defined Under Namespace
Classes: InvalidTransition
Constant Summary collapse
- STATES =
%i[new open closing closed].freeze
- TRANSITIONS =
{ new: %i[open closed].freeze, open: %i[closing closed].freeze, closing: %i[closed].freeze, closed: [].freeze, }.freeze
Instance Attribute Summary collapse
-
#all_peers_gone ⇒ Async::Promise
readonly
Resolves once all peers are gone (after having had peers).
-
#barrier ⇒ Async::Barrier
readonly
Holds every socket-scoped task (connection supervisors, reconnect loops, heartbeat, monitor, accept loops).
-
#on_io_thread ⇒ Boolean
readonly
True if parent_task is the shared Reactor thread.
-
#parent_task ⇒ Async::Task, ...
readonly
Root of the socket’s task tree (may be user-provided via
parent:on Socket#bind / Socket#connect; falls back to the current Async task or the shared Reactor root). -
#peer_connected ⇒ Async::Promise
readonly
Resolves with the first connected peer.
-
#reconnect_enabled ⇒ Boolean
Whether auto-reconnect is enabled.
- #state ⇒ Symbol readonly
Instance Method Summary collapse
- #alive? ⇒ Boolean
-
#capture_parent_task(parent: nil, linger:) ⇒ Boolean
Captures the socket’s task tree root.
- #closed? ⇒ Boolean
- #closing? ⇒ Boolean
-
#finish_closing! ⇒ Object
Transitions ‘:closing → :closed` (or `:new → :closed` for never-opened sockets).
-
#initialize ⇒ SocketLifecycle
constructor
A new instance of SocketLifecycle.
- #open? ⇒ Boolean
-
#resolve_all_peers_gone_if_empty(connections) ⇒ Object
Resolves ‘all_peers_gone` if we had peers and now have none.
-
#start_closing! ⇒ Object
Transitions ‘:open → :closing`.
Constructor Details
#initialize ⇒ SocketLifecycle
Returns a new instance of SocketLifecycle.
72 73 74 75 76 77 78 79 80 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 72 def initialize @state = :new @peer_connected = Async::Promise.new @all_peers_gone = Async::Promise.new @reconnect_enabled = true @parent_task = nil @on_io_thread = false @barrier = nil end |
Instance Attribute Details
#all_peers_gone ⇒ Async::Promise (readonly)
Returns resolves once all peers are gone (after having had peers).
47 48 49 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 47 def all_peers_gone @all_peers_gone end |
#barrier ⇒ Async::Barrier (readonly)
Returns holds every socket-scoped task (connection supervisors, reconnect loops, heartbeat, monitor, accept loops). OMQ::Engine#stop and OMQ::Engine#close call barrier.stop to cascade teardown through every per-connection barrier in one shot.
65 66 67 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 65 def @barrier end |
#on_io_thread ⇒ Boolean (readonly)
Returns true if parent_task is the shared Reactor thread.
58 59 60 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 58 def on_io_thread @on_io_thread end |
#parent_task ⇒ Async::Task, ... (readonly)
Returns root of the socket’s task tree (may be user-provided via parent: on Socket#bind / Socket#connect; falls back to the current Async task or the shared Reactor root).
54 55 56 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 54 def parent_task @parent_task end |
#peer_connected ⇒ Async::Promise (readonly)
Returns resolves with the first connected peer.
43 44 45 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 43 def peer_connected @peer_connected end |
#reconnect_enabled ⇒ Boolean
Returns whether auto-reconnect is enabled.
69 70 71 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 69 def reconnect_enabled @reconnect_enabled end |
#state ⇒ Symbol (readonly)
39 40 41 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 39 def state @state end |
Instance Method Details
#alive? ⇒ Boolean
86 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 86 def alive? = @state == :new || @state == :open |
#capture_parent_task(parent: nil, linger:) ⇒ Boolean
Captures the socket’s task tree root. Transitions ‘:new → :open`.
When parent is provided (any Async task/barrier/semaphore — any object that responds to #async), it is used as the root; this is the common Async idiom for letting callers place internal tasks under a caller-managed parent so teardown can be coordinated with other work. Otherwise falls back to the current Async task or the shared Reactor root for non-Async callers.
The socket-level #barrier is constructed with the captured root as its parent so every task spawned via barrier.async lives under the caller’s tree.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 107 def capture_parent_task(parent: nil, linger:) return false if @parent_task if parent @parent_task = parent elsif Async::Task.current? @parent_task = Async::Task.current else @parent_task = Reactor.root_task @on_io_thread = true Reactor.track_linger(linger) end @barrier = Async::Barrier.new(parent: @parent_task) transition!(:open) true end |
#closed? ⇒ Boolean
85 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 85 def closed? = @state == :closed |
#closing? ⇒ Boolean
84 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 84 def closing? = @state == :closing |
#finish_closing! ⇒ Object
Transitions ‘:closing → :closed` (or `:new → :closed` for never-opened sockets).
132 133 134 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 132 def finish_closing! transition!(:closed) end |
#open? ⇒ Boolean
83 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 83 def open? = @state == :open |
#resolve_all_peers_gone_if_empty(connections) ⇒ Object
Resolves ‘all_peers_gone` if we had peers and now have none.
139 140 141 142 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 139 def resolve_all_peers_gone_if_empty(connections) return unless @peer_connected.resolved? && connections.empty? @all_peers_gone.resolve(true) end |
#start_closing! ⇒ Object
Transitions ‘:open → :closing`.
125 126 127 |
# File 'lib/omq/engine/socket_lifecycle.rb', line 125 def start_closing! transition!(:closing) end |