Class: ZZQ::Engine
- Inherits:
-
Object
- Object
- ZZQ::Engine
- Defined in:
- lib/zzq/engine.rb,
lib/zzq/engine/keepalive.rb,
lib/zzq/engine/socket_lifecycle.rb,
lib/zzq/engine/connection_lifecycle.rb
Overview
Per-socket orchestrator. Owns listeners, the connection map, the transport registry, and the socket-level state machine. Adapted from NNQ::Engine minus reactor / inproc / SP-handshake concerns.
Defined Under Namespace
Classes: ConnectionLifecycle, Keepalive, SocketLifecycle
Class Attribute Summary collapse
-
.transports ⇒ Object
readonly
Returns the value of attribute transports.
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Hash=> ConnectionLifecycle.
-
#dialed ⇒ Object
readonly
Returns the value of attribute dialed.
-
#last_endpoint ⇒ Object
readonly
Returns the value of attribute last_endpoint.
-
#lifecycle ⇒ Object
readonly
Returns the value of attribute lifecycle.
-
#monitor_queue ⇒ Object
Returns the value of attribute monitor_queue.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#routing ⇒ Object
readonly
Returns the value of attribute routing.
Instance Method Summary collapse
- #all_peers_gone ⇒ Object
- #barrier ⇒ Object
-
#bind(endpoint, **opts) ⇒ Object
Bind (listener) — role-agnostic.
-
#capture_parent_task ⇒ Object
Must be called from inside an Async::Task.
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#connect(endpoint, **opts) ⇒ Object
Connect (dialer).
- #dial_opts_for(endpoint) ⇒ Object
- #emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object
-
#handle_accepted(io, endpoint:) ⇒ Object
Called by transports for each accepted connection.
-
#handle_connected(io, endpoint:) ⇒ Object
Called by transports for each dialed connection.
- #handle_connection_lost(conn) ⇒ Object
-
#initialize(options:) {|engine| ... } ⇒ Engine
constructor
A new instance of Engine.
- #maybe_reconnect(endpoint) ⇒ Object
- #parent_task ⇒ Object
- #peer_connected ⇒ Object
- #resolve_all_peers_gone_if_empty ⇒ Object
- #spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object
- #transport_for(endpoint) ⇒ Object
Constructor Details
#initialize(options:) {|engine| ... } ⇒ Engine
Returns a new instance of Engine.
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/zzq/engine.rb', line 36 def initialize(options:) @options = @connections = {} @listeners = [] @lifecycle = SocketLifecycle.new @last_endpoint = nil @monitor_queue = nil @dialed = Set.new @dial_opts = {} @routing = yield(self) end |
Class Attribute Details
.transports ⇒ Object (readonly)
Returns the value of attribute transports.
21 22 23 |
# File 'lib/zzq/engine.rb', line 21 def transports @transports end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Hash=> ConnectionLifecycle
27 28 29 |
# File 'lib/zzq/engine.rb', line 27 def connections @connections end |
#dialed ⇒ Object (readonly)
Returns the value of attribute dialed.
30 31 32 |
# File 'lib/zzq/engine.rb', line 30 def dialed @dialed end |
#last_endpoint ⇒ Object (readonly)
Returns the value of attribute last_endpoint.
29 30 31 |
# File 'lib/zzq/engine.rb', line 29 def last_endpoint @last_endpoint end |
#lifecycle ⇒ Object (readonly)
Returns the value of attribute lifecycle.
28 29 30 |
# File 'lib/zzq/engine.rb', line 28 def lifecycle @lifecycle end |
#monitor_queue ⇒ Object
Returns the value of attribute monitor_queue.
31 32 33 |
# File 'lib/zzq/engine.rb', line 31 def monitor_queue @monitor_queue end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
25 26 27 |
# File 'lib/zzq/engine.rb', line 25 def @options end |
#routing ⇒ Object (readonly)
Returns the value of attribute routing.
26 27 28 |
# File 'lib/zzq/engine.rb', line 26 def routing @routing end |
Instance Method Details
#all_peers_gone ⇒ Object
60 |
# File 'lib/zzq/engine.rb', line 60 def all_peers_gone = @lifecycle.all_peers_gone |
#barrier ⇒ Object
57 |
# File 'lib/zzq/engine.rb', line 57 def = @lifecycle. |
#bind(endpoint, **opts) ⇒ Object
Bind (listener) — role-agnostic. The routing layer (on accept) drives the MQTT handshake.
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/zzq/engine.rb', line 76 def bind(endpoint, **opts) transport = transport_for(endpoint) listener = transport.bind(endpoint, self, **opts) listener.start_accept_loop(@lifecycle.) do |io| handle_accepted(io, endpoint: endpoint) end @listeners << listener @last_endpoint = listener.endpoint emit_monitor_event(:listening, endpoint: @last_endpoint) end |
#capture_parent_task ⇒ Object
Must be called from inside an Async::Task.
69 70 71 |
# File 'lib/zzq/engine.rb', line 69 def capture_parent_task @lifecycle.capture_parent_task end |
#close ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/zzq/engine.rb', line 142 def close return unless @lifecycle.alive? @lifecycle.start_closing! @listeners.each(&:stop) @routing.close if @routing.respond_to?(:close) @connections.values.each(&:close!) emit_monitor_event(:closed) @monitor_queue&.enqueue(nil) @lifecycle.&.stop @lifecycle.finish_closing! @lifecycle.peer_connected.resolve(nil) unless @lifecycle.peer_connected.resolved? end |
#closed? ⇒ Boolean
58 |
# File 'lib/zzq/engine.rb', line 58 def closed? = @lifecycle.closed? |
#connect(endpoint, **opts) ⇒ Object
Connect (dialer).
89 90 91 92 93 94 |
# File 'lib/zzq/engine.rb', line 89 def connect(endpoint, **opts) @dialed << endpoint @dial_opts[endpoint] = opts unless opts.empty? @last_endpoint = endpoint transport_for(endpoint).connect(endpoint, self, **opts) end |
#dial_opts_for(endpoint) ⇒ Object
97 98 99 |
# File 'lib/zzq/engine.rb', line 97 def dial_opts_for(endpoint) @dial_opts[endpoint] || {} end |
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object
49 50 51 52 53 |
# File 'lib/zzq/engine.rb', line 49 def emit_monitor_event(type, endpoint: nil, detail: nil) return unless @monitor_queue @monitor_queue.enqueue(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail)) rescue Async::Stop end |
#handle_accepted(io, endpoint:) ⇒ Object
Called by transports for each accepted connection. Hands off to routing so the broker can read CONNECT / write CONNACK.
115 116 117 118 119 120 121 122 |
# File 'lib/zzq/engine.rb', line 115 def handle_accepted(io, endpoint:) lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint) mqtt = lifecycle.begin_handshake!(io) @routing.handle_accepted(mqtt, lifecycle) if @routing.respond_to?(:handle_accepted) rescue => e lifecycle.handshake_failed!(error: e) raise unless e.is_a?(ConnectionRejected) || CONNECTION_LOST.any? { |c| e.is_a?(c) } end |
#handle_connected(io, endpoint:) ⇒ Object
Called by transports for each dialed connection. Hands off to routing so the client can write CONNECT / read CONNACK.
127 128 129 130 131 132 133 134 |
# File 'lib/zzq/engine.rb', line 127 def handle_connected(io, endpoint:) lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint) mqtt = lifecycle.begin_handshake!(io) @routing.handle_connected(mqtt, lifecycle) if @routing.respond_to?(:handle_connected) rescue => e lifecycle.handshake_failed!(error: e) raise unless e.is_a?(ConnectionRejected) || CONNECTION_LOST.any? { |c| e.is_a?(c) } end |
#handle_connection_lost(conn) ⇒ Object
159 160 161 |
# File 'lib/zzq/engine.rb', line 159 def handle_connection_lost(conn) @connections[conn]&.lost! end |
#maybe_reconnect(endpoint) ⇒ Object
102 103 104 |
# File 'lib/zzq/engine.rb', line 102 def maybe_reconnect(endpoint) # Reconnect loop not wired up yet (M4 job). end |
#parent_task ⇒ Object
56 |
# File 'lib/zzq/engine.rb', line 56 def parent_task = @lifecycle.parent_task |
#peer_connected ⇒ Object
59 |
# File 'lib/zzq/engine.rb', line 59 def peer_connected = @lifecycle.peer_connected |
#resolve_all_peers_gone_if_empty ⇒ Object
63 64 65 |
# File 'lib/zzq/engine.rb', line 63 def resolve_all_peers_gone_if_empty @lifecycle.resolve_all_peers_gone_if_empty(@connections) end |
#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object
137 138 139 |
# File 'lib/zzq/engine.rb', line 137 def spawn_task(annotation:, parent: @lifecycle., &block) parent.async(annotation: annotation, &block) end |
#transport_for(endpoint) ⇒ Object
107 108 109 110 |
# File 'lib/zzq/engine.rb', line 107 def transport_for(endpoint) scheme = endpoint[/\A([a-z+]+):\/\//i, 1] or raise Error, "no scheme: #{endpoint}" Engine.transports[scheme] or raise Error, "unsupported transport: #{scheme}" end |