Class: NNQ::Engine
- Inherits:
-
Object
- Object
- NNQ::Engine
- Defined in:
- lib/nnq/engine.rb,
lib/nnq/engine/reconnect.rb,
lib/nnq/engine/socket_lifecycle.rb,
lib/nnq/engine/connection_lifecycle.rb
Overview
Per-socket orchestrator. Owns the listener set, the connection map (keyed on NNQ::Connection, with per-connection ConnectionLifecycle values), the transport registry, and the socket-level state machine via SocketLifecycle.
Mirrors OMQ’s Engine in shape but is much smaller because there’s no HWM bookkeeping, no mechanisms, no heartbeat, no monitor queue.
Defined Under Namespace
Classes: ConnectionLifecycle, Reconnect, SocketLifecycle
Constant Summary collapse
- TRANSPORTS =
{ "tcp" => Transport::TCP, "ipc" => Transport::IPC, "inproc" => Transport::Inproc, }
- CONNECTION_FAILED =
Connection errors that should trigger a reconnect retry rather than propagate. Mutable at load time so plugins (e.g. a future TLS transport) can append their own error classes; frozen on first #connect.
[ Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::ENETUNREACH, Errno::ENOENT, Errno::EPIPE, Errno::ETIMEDOUT, Socket::ResolutionError, ]
- CONNECTION_LOST =
Errors that indicate an established connection went away. Used by the recv loop and pumps to silently terminate (the connection lifecycle’s #lost! handler decides whether to reconnect).
[ EOFError, IOError, Errno::ECONNRESET, Errno::EPIPE, ]
Instance Attribute Summary collapse
- #connections ⇒ Hash{NNQ::Connection => ConnectionLifecycle} readonly
-
#dialed ⇒ Set<String>
readonly
Endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.
- #last_endpoint ⇒ String? readonly
- #lifecycle ⇒ SocketLifecycle readonly
-
#monitor_queue ⇒ Async::Queue?
Monitor event queue (set by Socket#monitor).
-
#new_pipe ⇒ Async::Condition
readonly
Signaled when a new pipe is registered.
- #options ⇒ Options readonly
-
#protocol ⇒ Integer
readonly
Our SP protocol id (e.g. Protocols::PUSH_V0).
- #routing ⇒ Routing strategy readonly
-
#verbose_monitor ⇒ Boolean
When true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue.
Instance Method Summary collapse
-
#all_peers_gone ⇒ Async::Promise
Resolves when all peers have disconnected (edge-triggered, after at least one peer connected).
- #barrier ⇒ Async::Barrier?
-
#bind(endpoint) ⇒ Object
Binds to
endpoint. -
#capture_parent_task(task, on_io_thread:) ⇒ Object
Stores the parent Async task that long-lived NNQ fibers will attach to.
-
#close ⇒ Object
Closes the engine: stops listeners, drains the send queue subject to linger, stops routing pumps (which by now are parked on the empty queue), then tears down every connection’s lifecycle.
-
#close_read ⇒ Object
Closes only the recv side.
- #closed? ⇒ Boolean
-
#connect(endpoint) ⇒ Object
Connects to
endpoint. -
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object
Emits a monitor event to the attached queue (if any).
-
#emit_verbose_msg_received(body) ⇒ Object
Emits a :message_received verbose event.
-
#emit_verbose_msg_sent(body) ⇒ Object
Emits a :message_sent verbose event.
-
#handle_accepted(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each accepted client connection.
-
#handle_connected(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each dialed connection.
-
#handle_connection_lost(conn) ⇒ Object
Called by routing pumps (or the recv loop) when their connection has died.
-
#initialize(protocol:, options:) {|engine| ... } ⇒ Engine
constructor
A new instance of Engine.
-
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for
endpointif auto-reconnect is enabled and the endpoint is still in the dialed set. - #parent_task ⇒ Async::Task?
-
#peer_connected ⇒ Async::Promise
Resolves with the first connected peer.
- #reconnect_enabled ⇒ Boolean
-
#reconnect_enabled=(value) ⇒ Object
Disables or re-enables automatic reconnect.
-
#resolve_all_peers_gone_if_empty ⇒ Object
Called by ConnectionLifecycle teardown.
-
#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object
Spawns a task under the given parent barrier (defaults to the socket-level barrier).
-
#transport_for(endpoint) ⇒ Object
Public so Reconnect can dial directly without re-deriving the transport from the URL each iteration.
Constructor Details
#initialize(protocol:, options:) {|engine| ... } ⇒ Engine
Returns a new instance of Engine.
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/nnq/engine.rb', line 83 def initialize(protocol:, options:) @protocol = protocol @options = @connections = {} @listeners = [] @lifecycle = SocketLifecycle.new @last_endpoint = nil @new_pipe = Async::Condition.new @monitor_queue = nil @verbose_monitor = false @dialed = Set.new @routing = yield(self) end |
Instance Attribute Details
#connections ⇒ Hash{NNQ::Connection => ConnectionLifecycle} (readonly)
48 49 50 |
# File 'lib/nnq/engine.rb', line 48 def connections @connections end |
#dialed ⇒ Set<String> (readonly)
Returns endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.
66 67 68 |
# File 'lib/nnq/engine.rb', line 66 def dialed @dialed end |
#last_endpoint ⇒ String? (readonly)
56 57 58 |
# File 'lib/nnq/engine.rb', line 56 def last_endpoint @last_endpoint end |
#lifecycle ⇒ SocketLifecycle (readonly)
52 53 54 |
# File 'lib/nnq/engine.rb', line 52 def lifecycle @lifecycle end |
#monitor_queue ⇒ Async::Queue?
Returns monitor event queue (set by Socket#monitor).
70 71 72 |
# File 'lib/nnq/engine.rb', line 70 def monitor_queue @monitor_queue end |
#new_pipe ⇒ Async::Condition (readonly)
Returns signaled when a new pipe is registered.
60 61 62 |
# File 'lib/nnq/engine.rb', line 60 def new_pipe @new_pipe end |
#protocol ⇒ Integer (readonly)
Returns our SP protocol id (e.g. Protocols::PUSH_V0).
36 37 38 |
# File 'lib/nnq/engine.rb', line 36 def protocol @protocol end |
#routing ⇒ Routing strategy (readonly)
44 45 46 |
# File 'lib/nnq/engine.rb', line 44 def routing @routing end |
#verbose_monitor ⇒ Boolean
Returns when true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue. Set by Socket#monitor via its verbose: kwarg.
76 77 78 |
# File 'lib/nnq/engine.rb', line 76 def verbose_monitor @verbose_monitor end |
Instance Method Details
#all_peers_gone ⇒ Async::Promise
Returns resolves when all peers have disconnected (edge-triggered, after at least one peer connected).
148 149 150 |
# File 'lib/nnq/engine.rb', line 148 def all_peers_gone @lifecycle.all_peers_gone end |
#barrier ⇒ Async::Barrier?
130 131 132 |
# File 'lib/nnq/engine.rb', line 130 def @lifecycle. end |
#bind(endpoint) ⇒ Object
Binds to endpoint. Synchronous: errors propagate.
190 191 192 193 194 195 196 197 198 199 |
# File 'lib/nnq/engine.rb', line 190 def bind(endpoint) transport = transport_for(endpoint) listener = transport.bind(endpoint, self) listener.start_accept_loop(@lifecycle.) do |io, framing = :tcp| handle_accepted(io, endpoint: endpoint, framing: framing) end @listeners << listener @last_endpoint = listener.endpoint emit_monitor_event(:listening, endpoint: @last_endpoint) end |
#capture_parent_task(task, on_io_thread:) ⇒ Object
Stores the parent Async task that long-lived NNQ fibers will attach to. The caller (Socket) is responsible for picking the right one (the user’s current task, or Reactor.root_task).
184 185 186 |
# File 'lib/nnq/engine.rb', line 184 def capture_parent_task(task, on_io_thread:) @lifecycle.capture_parent_task(task, on_io_thread: on_io_thread) end |
#close ⇒ Object
Closes the engine: stops listeners, drains the send queue subject to linger, stops routing pumps (which by now are parked on the empty queue), then tears down every connection’s lifecycle. Order matters — closing connections first would force mid-flush pumps to abort with IOError.
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/nnq/engine.rb', line 277 def close return unless @lifecycle.alive? @lifecycle.start_closing! @listeners.each(&:stop) drain_send_queue(@options.linger) @routing.close if @routing.respond_to?(:close) # Tear down each remaining connection via its lifecycle. The # collection mutates during iteration, so snapshot the values. @connections.values.each(&:close!) # Cascade-cancel every remaining task (reconnect loops, accept # loops, supervisors) in one shot. @lifecycle.&.stop @lifecycle.finish_closing! @new_pipe.signal # Unblock anyone waiting on peer_connected when the socket is # closed before a peer ever arrived. @lifecycle.peer_connected.resolve(nil) unless @lifecycle.peer_connected.resolved? emit_monitor_event(:closed) close_monitor_queue end |
#close_read ⇒ Object
Closes only the recv side. Buffered messages drain, then Socket#receive returns nil. Send side remains operational.
176 177 178 |
# File 'lib/nnq/engine.rb', line 176 def close_read @routing.close_read if @routing.respond_to?(:close_read) end |
#closed? ⇒ Boolean
135 136 137 |
# File 'lib/nnq/engine.rb', line 135 def closed? @lifecycle.closed? end |
#connect(endpoint) ⇒ Object
Connects to endpoint. Non-blocking for tcp:// and ipc:// — the actual dial happens inside a background reconnect task that retries with exponential back-off until the peer becomes reachable. Inproc connect is synchronous and instant.
206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/nnq/engine.rb', line 206 def connect(endpoint) @dialed << endpoint @last_endpoint = endpoint if endpoint.start_with?("inproc://") transport_for(endpoint).connect(endpoint, self) else emit_monitor_event(:connect_delayed, endpoint: endpoint) Reconnect.schedule(endpoint, @options, @lifecycle., self, delay: 0) end end |
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object
Emits a monitor event to the attached queue (if any).
99 100 101 102 103 |
# File 'lib/nnq/engine.rb', line 99 def emit_monitor_event(type, endpoint: nil, detail: nil) return unless @monitor_queue @monitor_queue.enqueue(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail)) rescue Async::Stop end |
#emit_verbose_msg_received(body) ⇒ Object
Emits a :message_received verbose event. Same early-return discipline as #emit_verbose_msg_sent.
117 118 119 120 |
# File 'lib/nnq/engine.rb', line 117 def emit_verbose_msg_received(body) return unless @verbose_monitor emit_monitor_event(:message_received, detail: { body: body }) end |
#emit_verbose_msg_sent(body) ⇒ Object
Emits a :message_sent verbose event. Early-returns before allocating the detail hash so the hot send path pays nothing when verbose monitoring is off.
109 110 111 112 |
# File 'lib/nnq/engine.rb', line 109 def emit_verbose_msg_sent(body) return unless @verbose_monitor emit_monitor_event(:message_sent, detail: { body: body }) end |
#handle_accepted(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each accepted client connection.
239 240 241 242 243 244 245 246 247 248 |
# File 'lib/nnq/engine.rb', line 239 def handle_accepted(io, endpoint:, framing: :tcp) lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing) lifecycle.handshake!(io) spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn) lifecycle.start_supervisor! rescue ConnectionRejected # routing rejected this peer (e.g. PAIR already bonded) — lifecycle cleaned up rescue => e warn("nnq: handshake failed for #{endpoint}: #{e.class}: #{e.}") if $DEBUG end |
#handle_connected(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each dialed connection.
252 253 254 255 256 257 258 259 |
# File 'lib/nnq/engine.rb', line 252 def handle_connected(io, endpoint:, framing: :tcp) lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing) lifecycle.handshake!(io) spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn) lifecycle.start_supervisor! rescue ConnectionRejected # unusual on connect side, but handled identically end |
#handle_connection_lost(conn) ⇒ Object
Called by routing pumps (or the recv loop) when their connection has died. Idempotent via the lifecycle state guard.
305 306 307 |
# File 'lib/nnq/engine.rb', line 305 def handle_connection_lost(conn) @connections[conn]&.lost! end |
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still in the dialed set. Called from the connection lifecycle’s ‘lost!` path.
222 223 224 225 226 227 |
# File 'lib/nnq/engine.rb', line 222 def maybe_reconnect(endpoint) return unless endpoint && @dialed.include?(endpoint) return unless @lifecycle.alive? && @lifecycle.reconnect_enabled return if endpoint.start_with?("inproc://") Reconnect.schedule(endpoint, @options, @lifecycle., self) end |
#parent_task ⇒ Async::Task?
124 125 126 |
# File 'lib/nnq/engine.rb', line 124 def parent_task @lifecycle.parent_task end |
#peer_connected ⇒ Async::Promise
Returns resolves with the first connected peer.
141 142 143 |
# File 'lib/nnq/engine.rb', line 141 def peer_connected @lifecycle.peer_connected end |
#reconnect_enabled ⇒ Boolean
161 162 163 |
# File 'lib/nnq/engine.rb', line 161 def reconnect_enabled @lifecycle.reconnect_enabled end |
#reconnect_enabled=(value) ⇒ Object
Disables or re-enables automatic reconnect. nnq has no reconnect loop yet, so this is forward-looking — TransientMonitor flips it before draining.
169 170 171 |
# File 'lib/nnq/engine.rb', line 169 def reconnect_enabled=(value) @lifecycle.reconnect_enabled = value end |
#resolve_all_peers_gone_if_empty ⇒ Object
Called by ConnectionLifecycle teardown. Resolves ‘all_peers_gone` if the connection set is now empty and we had peers.
155 156 157 |
# File 'lib/nnq/engine.rb', line 155 def resolve_all_peers_gone_if_empty @lifecycle.resolve_all_peers_gone_if_empty(@connections) end |
#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object
Spawns a task under the given parent barrier (defaults to the socket-level barrier). Used by routing strategies (e.g. PUSH send pump) to attach long-lived fibers to the engine’s lifecycle. The parent barrier tracks every spawned task so teardown is a single barrier.stop call.
267 268 269 |
# File 'lib/nnq/engine.rb', line 267 def spawn_task(annotation:, parent: @lifecycle., &block) parent.async(annotation: annotation, &block) end |
#transport_for(endpoint) ⇒ Object
Public so Reconnect can dial directly without re-deriving the transport from the URL each iteration.
232 233 234 235 |
# File 'lib/nnq/engine.rb', line 232 def transport_for(endpoint) scheme = endpoint[/\A([a-z+]+):\/\//i, 1] or raise Error, "no scheme: #{endpoint}" TRANSPORTS[scheme] or raise Error, "unsupported transport: #{scheme}" end |