Class: OMQ::Engine::SocketLifecycle

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/engine/socket_lifecycle.rb

Overview

Owns the socket-level state: ‘:new → :open → :closing → :closed`, the first-peer / last-peer signaling promises, the reconnect flag, and the captured parent task for the socket’s task tree.

Scope boundary: SocketLifecycle is per-socket and outlives every individual peer link. ConnectionLifecycle is per-connection and handles one handshake → ready → closed arc beneath it. Roughly: SocketLifecycle answers “is this socket open and do we have any peers?”, ConnectionLifecycle answers “is this specific peer link ready / lost?”.

Engine delegates state queries here and uses it to coordinate the ordering of close-time side effects. This consolidates six ivars (‘@state`, `@peer_connected`, `@all_peers_gone`, `@reconnect_enabled`, `@parent_task`, `@on_io_thread`) into one cohesive object with explicit transitions.

Defined Under Namespace

Classes: InvalidTransition

Constant Summary collapse

STATES =
%i[new open closing closed].freeze
TRANSITIONS =
{
  new:     %i[open closed].freeze,
  open:    %i[closing closed].freeze,
  closing: %i[closed].freeze,
  closed:  [].freeze,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSocketLifecycle

Returns a new instance of SocketLifecycle.



72
73
74
75
76
77
78
79
80
# File 'lib/omq/engine/socket_lifecycle.rb', line 72

def initialize
  @state             = :new
  @peer_connected    = Async::Promise.new
  @all_peers_gone    = Async::Promise.new
  @reconnect_enabled = true
  @parent_task       = nil
  @on_io_thread      = false
  @barrier           = nil
end

Instance Attribute Details

#all_peers_goneAsync::Promise (readonly)

Returns resolves once all peers are gone (after having had peers).

Returns:

  • (Async::Promise)

    resolves once all peers are gone (after having had peers)



47
48
49
# File 'lib/omq/engine/socket_lifecycle.rb', line 47

def all_peers_gone
  @all_peers_gone
end

#barrierAsync::Barrier (readonly)

Returns holds every socket-scoped task (connection supervisors, reconnect loops, heartbeat, monitor, accept loops). OMQ::Engine#stop and OMQ::Engine#close call barrier.stop to cascade teardown through every per-connection barrier in one shot.

Returns:

  • (Async::Barrier)

    holds every socket-scoped task (connection supervisors, reconnect loops, heartbeat, monitor, accept loops). OMQ::Engine#stop and OMQ::Engine#close call barrier.stop to cascade teardown through every per-connection barrier in one shot.



65
66
67
# File 'lib/omq/engine/socket_lifecycle.rb', line 65

def barrier
  @barrier
end

#on_io_threadBoolean (readonly)

Returns true if parent_task is the shared Reactor thread.

Returns:

  • (Boolean)

    true if parent_task is the shared Reactor thread



58
59
60
# File 'lib/omq/engine/socket_lifecycle.rb', line 58

def on_io_thread
  @on_io_thread
end

#parent_taskAsync::Task, ... (readonly)

Returns root of the socket’s task tree (may be user-provided via parent: on Socket#bind / Socket#connect; falls back to the current Async task or the shared Reactor root).

Returns:

  • (Async::Task, Async::Barrier, Async::Semaphore, nil)

    root of the socket’s task tree (may be user-provided via parent: on Socket#bind / Socket#connect; falls back to the current Async task or the shared Reactor root)



54
55
56
# File 'lib/omq/engine/socket_lifecycle.rb', line 54

def parent_task
  @parent_task
end

#peer_connectedAsync::Promise (readonly)

Returns resolves with the first connected peer.

Returns:

  • (Async::Promise)

    resolves with the first connected peer



43
44
45
# File 'lib/omq/engine/socket_lifecycle.rb', line 43

def peer_connected
  @peer_connected
end

#reconnect_enabledBoolean

Returns whether auto-reconnect is enabled.

Returns:

  • (Boolean)

    whether auto-reconnect is enabled



69
70
71
# File 'lib/omq/engine/socket_lifecycle.rb', line 69

def reconnect_enabled
  @reconnect_enabled
end

#stateSymbol (readonly)

Returns:

  • (Symbol)


39
40
41
# File 'lib/omq/engine/socket_lifecycle.rb', line 39

def state
  @state
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


86
# File 'lib/omq/engine/socket_lifecycle.rb', line 86

def alive?     = @state == :new || @state == :open

#capture_parent_task(parent: nil, linger:) ⇒ Boolean

Captures the socket’s task tree root. Transitions ‘:new → :open`.

When parent is provided (any Async task/barrier/semaphore — any object that responds to #async), it is used as the root; this is the common Async idiom for letting callers place internal tasks under a caller-managed parent so teardown can be coordinated with other work. Otherwise falls back to the current Async task or the shared Reactor root for non-Async callers.

The socket-level #barrier is constructed with the captured root as its parent so every task spawned via barrier.async lives under the caller’s tree.

Parameters:

  • parent (#async, nil) (defaults to: nil)

    optional Async parent

  • linger (Numeric, nil)

    used to register the Reactor linger slot when falling back to the IO thread

Returns:

  • (Boolean)

    true on first-time capture, false if already captured



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/omq/engine/socket_lifecycle.rb', line 107

def capture_parent_task(parent: nil, linger:)
  return false if @parent_task
  if parent
    @parent_task  = parent
  elsif Async::Task.current?
    @parent_task = Async::Task.current
  else
    @parent_task  = Reactor.root_task
    @on_io_thread = true
    Reactor.track_linger(linger)
  end
  @barrier = Async::Barrier.new(parent: @parent_task)
  transition!(:open)
  true
end

#closed?Boolean

Returns:

  • (Boolean)


85
# File 'lib/omq/engine/socket_lifecycle.rb', line 85

def closed?    = @state == :closed

#closing?Boolean

Returns:

  • (Boolean)


84
# File 'lib/omq/engine/socket_lifecycle.rb', line 84

def closing?   = @state == :closing

#finish_closing!Object

Transitions ‘:closing → :closed` (or `:new → :closed` for never-opened sockets).



132
133
134
# File 'lib/omq/engine/socket_lifecycle.rb', line 132

def finish_closing!
  transition!(:closed)
end

#open?Boolean

Returns:

  • (Boolean)


83
# File 'lib/omq/engine/socket_lifecycle.rb', line 83

def open?      = @state == :open

#resolve_all_peers_gone_if_empty(connections) ⇒ Object

Resolves ‘all_peers_gone` if we had peers and now have none.

Parameters:

  • connections (Hash)

    current connection map



139
140
141
142
# File 'lib/omq/engine/socket_lifecycle.rb', line 139

def resolve_all_peers_gone_if_empty(connections)
  return unless @peer_connected.resolved? && connections.empty?
  @all_peers_gone.resolve(true)
end

#start_closing!Object

Transitions ‘:open → :closing`.



125
126
127
# File 'lib/omq/engine/socket_lifecycle.rb', line 125

def start_closing!
  transition!(:closing)
end