Class: ZZQ::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/engine.rb,
lib/zzq/engine/keepalive.rb,
lib/zzq/engine/socket_lifecycle.rb,
lib/zzq/engine/connection_lifecycle.rb

Overview

Per-socket orchestrator. Owns listeners, the connection map, the transport registry, and the socket-level state machine. Adapted from NNQ::Engine minus reactor / inproc / SP-handshake concerns.

Defined Under Namespace

Classes: ConnectionLifecycle, Keepalive, SocketLifecycle

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options:) {|engine| ... } ⇒ Engine

Returns a new instance of Engine.

Yield Parameters:

  • engine (Engine)

    build_routing hook; returns the role-specific routing strategy.



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/zzq/engine.rb', line 36

def initialize(options:)
  @options       = options
  @connections   = {}
  @listeners     = []
  @lifecycle     = SocketLifecycle.new
  @last_endpoint = nil
  @monitor_queue = nil
  @dialed        = Set.new
  @dial_opts     = {}
  @routing       = yield(self)
end

Class Attribute Details

.transportsObject (readonly)

Returns the value of attribute transports.



21
22
23
# File 'lib/zzq/engine.rb', line 21

def transports
  @transports
end

Instance Attribute Details

#connectionsObject (readonly)

Hash=> ConnectionLifecycle



27
28
29
# File 'lib/zzq/engine.rb', line 27

def connections
  @connections
end

#dialedObject (readonly)

Returns the value of attribute dialed.



30
31
32
# File 'lib/zzq/engine.rb', line 30

def dialed
  @dialed
end

#last_endpointObject (readonly)

Returns the value of attribute last_endpoint.



29
30
31
# File 'lib/zzq/engine.rb', line 29

def last_endpoint
  @last_endpoint
end

#lifecycleObject (readonly)

Returns the value of attribute lifecycle.



28
29
30
# File 'lib/zzq/engine.rb', line 28

def lifecycle
  @lifecycle
end

#monitor_queueObject

Returns the value of attribute monitor_queue.



31
32
33
# File 'lib/zzq/engine.rb', line 31

def monitor_queue
  @monitor_queue
end

#optionsObject (readonly)

Returns the value of attribute options.



25
26
27
# File 'lib/zzq/engine.rb', line 25

def options
  @options
end

#routingObject (readonly)

Returns the value of attribute routing.



26
27
28
# File 'lib/zzq/engine.rb', line 26

def routing
  @routing
end

Instance Method Details

#all_peers_goneObject



60
# File 'lib/zzq/engine.rb', line 60

def all_peers_gone = @lifecycle.all_peers_gone

#barrierObject



57
# File 'lib/zzq/engine.rb', line 57

def barrier       = @lifecycle.barrier

#bind(endpoint, **opts) ⇒ Object

Bind (listener) — role-agnostic. The routing layer (on accept) drives the MQTT handshake.



76
77
78
79
80
81
82
83
84
85
# File 'lib/zzq/engine.rb', line 76

def bind(endpoint, **opts)
  transport = transport_for(endpoint)
  listener  = transport.bind(endpoint, self, **opts)
  listener.start_accept_loop(@lifecycle.barrier) do |io|
    handle_accepted(io, endpoint: endpoint)
  end
  @listeners << listener
  @last_endpoint = listener.endpoint
  emit_monitor_event(:listening, endpoint: @last_endpoint)
end

#capture_parent_taskObject

Must be called from inside an Async::Task.



69
70
71
# File 'lib/zzq/engine.rb', line 69

def capture_parent_task
  @lifecycle.capture_parent_task
end

#closeObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/zzq/engine.rb', line 142

def close
  return unless @lifecycle.alive?

  @lifecycle.start_closing!
  @listeners.each(&:stop)
  @routing.close if @routing.respond_to?(:close)
  @connections.values.each(&:close!)

  emit_monitor_event(:closed)
  @monitor_queue&.enqueue(nil)

  @lifecycle.barrier&.stop
  @lifecycle.finish_closing!
  @lifecycle.peer_connected.resolve(nil) unless @lifecycle.peer_connected.resolved?
end

#closed?Boolean

Returns:

  • (Boolean)


58
# File 'lib/zzq/engine.rb', line 58

def closed?       = @lifecycle.closed?

#connect(endpoint, **opts) ⇒ Object

Connect (dialer).



89
90
91
92
93
94
# File 'lib/zzq/engine.rb', line 89

def connect(endpoint, **opts)
  @dialed << endpoint
  @dial_opts[endpoint] = opts unless opts.empty?
  @last_endpoint = endpoint
  transport_for(endpoint).connect(endpoint, self, **opts)
end

#dial_opts_for(endpoint) ⇒ Object



97
98
99
# File 'lib/zzq/engine.rb', line 97

def dial_opts_for(endpoint)
  @dial_opts[endpoint] || {}
end

#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object



49
50
51
52
53
# File 'lib/zzq/engine.rb', line 49

def emit_monitor_event(type, endpoint: nil, detail: nil)
  return unless @monitor_queue
  @monitor_queue.enqueue(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail))
rescue Async::Stop
end

#handle_accepted(io, endpoint:) ⇒ Object

Called by transports for each accepted connection. Hands off to routing so the broker can read CONNECT / write CONNACK.



115
116
117
118
119
120
121
122
# File 'lib/zzq/engine.rb', line 115

def handle_accepted(io, endpoint:)
  lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint)
  mqtt = lifecycle.begin_handshake!(io)
  @routing.handle_accepted(mqtt, lifecycle) if @routing.respond_to?(:handle_accepted)
rescue => e
  lifecycle.handshake_failed!(error: e)
  raise unless e.is_a?(ConnectionRejected) || CONNECTION_LOST.any? { |c| e.is_a?(c) }
end

#handle_connected(io, endpoint:) ⇒ Object

Called by transports for each dialed connection. Hands off to routing so the client can write CONNECT / read CONNACK.



127
128
129
130
131
132
133
134
# File 'lib/zzq/engine.rb', line 127

def handle_connected(io, endpoint:)
  lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint)
  mqtt = lifecycle.begin_handshake!(io)
  @routing.handle_connected(mqtt, lifecycle) if @routing.respond_to?(:handle_connected)
rescue => e
  lifecycle.handshake_failed!(error: e)
  raise unless e.is_a?(ConnectionRejected) || CONNECTION_LOST.any? { |c| e.is_a?(c) }
end

#handle_connection_lost(conn) ⇒ Object



159
160
161
# File 'lib/zzq/engine.rb', line 159

def handle_connection_lost(conn)
  @connections[conn]&.lost!
end

#maybe_reconnect(endpoint) ⇒ Object



102
103
104
# File 'lib/zzq/engine.rb', line 102

def maybe_reconnect(endpoint)
  # Reconnect loop not wired up yet (M4 job).
end

#parent_taskObject



56
# File 'lib/zzq/engine.rb', line 56

def parent_task   = @lifecycle.parent_task

#peer_connectedObject



59
# File 'lib/zzq/engine.rb', line 59

def peer_connected = @lifecycle.peer_connected

#resolve_all_peers_gone_if_emptyObject



63
64
65
# File 'lib/zzq/engine.rb', line 63

def resolve_all_peers_gone_if_empty
  @lifecycle.resolve_all_peers_gone_if_empty(@connections)
end

#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object



137
138
139
# File 'lib/zzq/engine.rb', line 137

def spawn_task(annotation:, parent: @lifecycle.barrier, &block)
  parent.async(annotation: annotation, &block)
end

#transport_for(endpoint) ⇒ Object



107
108
109
110
# File 'lib/zzq/engine.rb', line 107

def transport_for(endpoint)
  scheme = endpoint[/\A([a-z+]+):\/\//i, 1] or raise Error, "no scheme: #{endpoint}"
  Engine.transports[scheme] or raise Error, "unsupported transport: #{scheme}"
end