Class: NNQ::Engine
- Inherits:
-
Object
- Object
- NNQ::Engine
- Defined in:
- lib/nnq/engine.rb,
lib/nnq/engine/reconnect.rb,
lib/nnq/engine/socket_lifecycle.rb,
lib/nnq/engine/connection_lifecycle.rb
Overview
Per-socket orchestrator. Owns the listener set, the connection map (keyed on NNQ::Connection, with per-connection ConnectionLifecycle values), the transport registry, and the socket-level state machine via SocketLifecycle.
Mirrors OMQ’s Engine in shape but is much smaller because there’s no HWM bookkeeping, no mechanisms, no heartbeat, no monitor queue.
Defined Under Namespace
Classes: ConnectionLifecycle, Reconnect, SocketLifecycle
Constant Summary collapse
- TRANSPORTS =
{ "tcp" => Transport::TCP, "ipc" => Transport::IPC, "inproc" => Transport::Inproc, }
- CONNECTION_FAILED =
Connection errors that should trigger a reconnect retry rather than propagate. Mutable at load time so plugins (e.g. a future TLS transport) can append their own error classes; frozen on first #connect.
[ Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::ENETUNREACH, Errno::ENOENT, Errno::EPIPE, Errno::ETIMEDOUT, Socket::ResolutionError, ]
- CONNECTION_LOST =
Errors that indicate an established connection went away. Used by the recv loop and pumps to silently terminate (the connection lifecycle’s #lost! handler decides whether to reconnect).
[ EOFError, IOError, Errno::ECONNRESET, Errno::EPIPE, ]
Instance Attribute Summary collapse
- #connections ⇒ Hash{NNQ::Connection => ConnectionLifecycle} readonly
-
#dialed ⇒ Set<String>
readonly
Endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.
- #last_endpoint ⇒ String? readonly
- #lifecycle ⇒ SocketLifecycle readonly
-
#monitor_queue ⇒ Async::Queue?
Monitor event queue (set by Socket#monitor).
-
#monitor_task ⇒ Async::Task?
The monitor consumer task, if any.
-
#new_pipe ⇒ Async::Condition
readonly
Signaled when a new pipe is registered.
- #options ⇒ Options readonly
-
#protocol ⇒ Integer
readonly
Our SP protocol id (e.g. Protocols::PUSH_V0).
- #routing ⇒ Routing strategy readonly
-
#verbose_monitor ⇒ Boolean
When true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue.
Instance Method Summary collapse
-
#all_peers_gone ⇒ Async::Promise
Resolves when all peers have disconnected (edge-triggered, after at least one peer connected).
- #barrier ⇒ Async::Barrier?
-
#bind(endpoint) ⇒ Object
Binds to
endpoint. -
#capture_parent_task(task, on_io_thread:) ⇒ Object
Stores the parent Async task that long-lived NNQ fibers will attach to.
-
#close ⇒ Object
Closes the engine: stops listeners, drains the send queue subject to linger, stops routing pumps (which by now are parked on the empty queue), then tears down every connection’s lifecycle.
-
#close_read ⇒ Object
Closes only the recv side.
- #closed? ⇒ Boolean
-
#connect(endpoint) ⇒ Object
Connects to
endpoint. -
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object
Emits a monitor event to the attached queue (if any).
-
#emit_verbose_msg_received(body) ⇒ Object
Emits a :message_received verbose event.
-
#emit_verbose_msg_sent(body) ⇒ Object
Emits a :message_sent verbose event.
-
#handle_accepted(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each accepted client connection.
-
#handle_connected(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each dialed connection.
-
#handle_connection_lost(conn) ⇒ Object
Called by routing pumps (or the recv loop) when their connection has died.
-
#initialize(protocol:, options:) {|engine| ... } ⇒ Engine
constructor
A new instance of Engine.
-
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for
endpointif auto-reconnect is enabled and the endpoint is still in the dialed set. - #parent_task ⇒ Async::Task?
-
#peer_connected ⇒ Async::Promise
Resolves with the first connected peer.
- #reconnect_enabled ⇒ Boolean
-
#reconnect_enabled=(value) ⇒ Object
Disables or re-enables automatic reconnect.
-
#resolve_all_peers_gone_if_empty ⇒ Object
Called by ConnectionLifecycle teardown.
-
#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object
Spawns a task under the given parent barrier (defaults to the socket-level barrier).
-
#transport_for(endpoint) ⇒ Object
Public so Reconnect can dial directly without re-deriving the transport from the URL each iteration.
Constructor Details
#initialize(protocol:, options:) {|engine| ... } ⇒ Engine
Returns a new instance of Engine.
87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/nnq/engine.rb', line 87 def initialize(protocol:, options:) @protocol = protocol @options = @connections = {} @listeners = [] @lifecycle = SocketLifecycle.new @last_endpoint = nil @new_pipe = Async::Condition.new @monitor_queue = nil @verbose_monitor = false @dialed = Set.new @routing = yield(self) end |
Instance Attribute Details
#connections ⇒ Hash{NNQ::Connection => ConnectionLifecycle} (readonly)
48 49 50 |
# File 'lib/nnq/engine.rb', line 48 def connections @connections end |
#dialed ⇒ Set<String> (readonly)
Returns endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.
66 67 68 |
# File 'lib/nnq/engine.rb', line 66 def dialed @dialed end |
#last_endpoint ⇒ String? (readonly)
56 57 58 |
# File 'lib/nnq/engine.rb', line 56 def last_endpoint @last_endpoint end |
#lifecycle ⇒ SocketLifecycle (readonly)
52 53 54 |
# File 'lib/nnq/engine.rb', line 52 def lifecycle @lifecycle end |
#monitor_queue ⇒ Async::Queue?
Returns monitor event queue (set by Socket#monitor).
70 71 72 |
# File 'lib/nnq/engine.rb', line 70 def monitor_queue @monitor_queue end |
#monitor_task ⇒ Async::Task?
Returns the monitor consumer task, if any.
74 75 76 |
# File 'lib/nnq/engine.rb', line 74 def monitor_task @monitor_task end |
#new_pipe ⇒ Async::Condition (readonly)
Returns signaled when a new pipe is registered.
60 61 62 |
# File 'lib/nnq/engine.rb', line 60 def new_pipe @new_pipe end |
#protocol ⇒ Integer (readonly)
Returns our SP protocol id (e.g. Protocols::PUSH_V0).
36 37 38 |
# File 'lib/nnq/engine.rb', line 36 def protocol @protocol end |
#routing ⇒ Routing strategy (readonly)
44 45 46 |
# File 'lib/nnq/engine.rb', line 44 def routing @routing end |
#verbose_monitor ⇒ Boolean
Returns when true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue. Set by Socket#monitor via its verbose: kwarg.
80 81 82 |
# File 'lib/nnq/engine.rb', line 80 def verbose_monitor @verbose_monitor end |
Instance Method Details
#all_peers_gone ⇒ Async::Promise
Returns resolves when all peers have disconnected (edge-triggered, after at least one peer connected).
152 153 154 |
# File 'lib/nnq/engine.rb', line 152 def all_peers_gone @lifecycle.all_peers_gone end |
#barrier ⇒ Async::Barrier?
134 135 136 |
# File 'lib/nnq/engine.rb', line 134 def @lifecycle. end |
#bind(endpoint) ⇒ Object
Binds to endpoint. Synchronous: errors propagate.
194 195 196 197 198 199 200 201 202 203 |
# File 'lib/nnq/engine.rb', line 194 def bind(endpoint) transport = transport_for(endpoint) listener = transport.bind(endpoint, self) listener.start_accept_loop(@lifecycle.) do |io, framing = :tcp| handle_accepted(io, endpoint: endpoint, framing: framing) end @listeners << listener @last_endpoint = listener.endpoint emit_monitor_event(:listening, endpoint: @last_endpoint) end |
#capture_parent_task(task, on_io_thread:) ⇒ Object
Stores the parent Async task that long-lived NNQ fibers will attach to. The caller (Socket) is responsible for picking the right one (the user’s current task, or Reactor.root_task).
188 189 190 |
# File 'lib/nnq/engine.rb', line 188 def capture_parent_task(task, on_io_thread:) @lifecycle.capture_parent_task(task, on_io_thread: on_io_thread) end |
#close ⇒ Object
Closes the engine: stops listeners, drains the send queue subject to linger, stops routing pumps (which by now are parked on the empty queue), then tears down every connection’s lifecycle. Order matters — closing connections first would force mid-flush pumps to abort with IOError.
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/nnq/engine.rb', line 281 def close return unless @lifecycle.alive? @lifecycle.start_closing! @listeners.each(&:stop) drain_send_queue(@options.linger) @routing.close if @routing.respond_to?(:close) # Tear down each remaining connection via its lifecycle. The # collection mutates during iteration, so snapshot the values. @connections.values.each(&:close!) # Emit :closed, seal the monitor queue, and wait for the monitor # fiber to drain it before cancelling tasks. Without this join, # trailing :message_received events that the recv pump enqueued # just before close would be lost when the barrier.stop below # Async::Stops the monitor fiber mid-dequeue. emit_monitor_event(:closed) close_monitor_queue @monitor_task&.wait # Cascade-cancel every remaining task (reconnect loops, accept # loops, supervisors) in one shot. @lifecycle.&.stop @lifecycle.finish_closing! @new_pipe.signal # Unblock anyone waiting on peer_connected when the socket is # closed before a peer ever arrived. @lifecycle.peer_connected.resolve(nil) unless @lifecycle.peer_connected.resolved? end |
#close_read ⇒ Object
Closes only the recv side. Buffered messages drain, then Socket#receive returns nil. Send side remains operational.
180 181 182 |
# File 'lib/nnq/engine.rb', line 180 def close_read @routing.close_read if @routing.respond_to?(:close_read) end |
#closed? ⇒ Boolean
139 140 141 |
# File 'lib/nnq/engine.rb', line 139 def closed? @lifecycle.closed? end |
#connect(endpoint) ⇒ Object
Connects to endpoint. Non-blocking for tcp:// and ipc:// — the actual dial happens inside a background reconnect task that retries with exponential back-off until the peer becomes reachable. Inproc connect is synchronous and instant.
210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/nnq/engine.rb', line 210 def connect(endpoint) @dialed << endpoint @last_endpoint = endpoint if endpoint.start_with?("inproc://") transport_for(endpoint).connect(endpoint, self) else emit_monitor_event(:connect_delayed, endpoint: endpoint) Reconnect.schedule(endpoint, @options, @lifecycle., self, delay: 0) end end |
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object
Emits a monitor event to the attached queue (if any).
103 104 105 106 107 |
# File 'lib/nnq/engine.rb', line 103 def emit_monitor_event(type, endpoint: nil, detail: nil) return unless @monitor_queue @monitor_queue.enqueue(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail)) rescue Async::Stop end |
#emit_verbose_msg_received(body) ⇒ Object
Emits a :message_received verbose event. Same early-return discipline as #emit_verbose_msg_sent.
121 122 123 124 |
# File 'lib/nnq/engine.rb', line 121 def emit_verbose_msg_received(body) return unless @verbose_monitor emit_monitor_event(:message_received, detail: { body: body }) end |
#emit_verbose_msg_sent(body) ⇒ Object
Emits a :message_sent verbose event. Early-returns before allocating the detail hash so the hot send path pays nothing when verbose monitoring is off.
113 114 115 116 |
# File 'lib/nnq/engine.rb', line 113 def emit_verbose_msg_sent(body) return unless @verbose_monitor emit_monitor_event(:message_sent, detail: { body: body }) end |
#handle_accepted(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each accepted client connection.
243 244 245 246 247 248 249 250 251 252 |
# File 'lib/nnq/engine.rb', line 243 def handle_accepted(io, endpoint:, framing: :tcp) lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing) lifecycle.handshake!(io) spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn) lifecycle.start_supervisor! rescue ConnectionRejected # routing rejected this peer (e.g. PAIR already bonded) — lifecycle cleaned up rescue => e warn("nnq: handshake failed for #{endpoint}: #{e.class}: #{e.}") if $DEBUG end |
#handle_connected(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each dialed connection.
256 257 258 259 260 261 262 263 |
# File 'lib/nnq/engine.rb', line 256 def handle_connected(io, endpoint:, framing: :tcp) lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing) lifecycle.handshake!(io) spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn) lifecycle.start_supervisor! rescue ConnectionRejected # unusual on connect side, but handled identically end |
#handle_connection_lost(conn) ⇒ Object
Called by routing pumps (or the recv loop) when their connection has died. Idempotent via the lifecycle state guard.
316 317 318 |
# File 'lib/nnq/engine.rb', line 316 def handle_connection_lost(conn) @connections[conn]&.lost! end |
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still in the dialed set. Called from the connection lifecycle’s ‘lost!` path.
226 227 228 229 230 231 |
# File 'lib/nnq/engine.rb', line 226 def maybe_reconnect(endpoint) return unless endpoint && @dialed.include?(endpoint) return unless @lifecycle.alive? && @lifecycle.reconnect_enabled return if endpoint.start_with?("inproc://") Reconnect.schedule(endpoint, @options, @lifecycle., self) end |
#parent_task ⇒ Async::Task?
128 129 130 |
# File 'lib/nnq/engine.rb', line 128 def parent_task @lifecycle.parent_task end |
#peer_connected ⇒ Async::Promise
Returns resolves with the first connected peer.
145 146 147 |
# File 'lib/nnq/engine.rb', line 145 def peer_connected @lifecycle.peer_connected end |
#reconnect_enabled ⇒ Boolean
165 166 167 |
# File 'lib/nnq/engine.rb', line 165 def reconnect_enabled @lifecycle.reconnect_enabled end |
#reconnect_enabled=(value) ⇒ Object
Disables or re-enables automatic reconnect. nnq has no reconnect loop yet, so this is forward-looking — TransientMonitor flips it before draining.
173 174 175 |
# File 'lib/nnq/engine.rb', line 173 def reconnect_enabled=(value) @lifecycle.reconnect_enabled = value end |
#resolve_all_peers_gone_if_empty ⇒ Object
Called by ConnectionLifecycle teardown. Resolves ‘all_peers_gone` if the connection set is now empty and we had peers.
159 160 161 |
# File 'lib/nnq/engine.rb', line 159 def resolve_all_peers_gone_if_empty @lifecycle.resolve_all_peers_gone_if_empty(@connections) end |
#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object
Spawns a task under the given parent barrier (defaults to the socket-level barrier). Used by routing strategies (e.g. PUSH send pump) to attach long-lived fibers to the engine’s lifecycle. The parent barrier tracks every spawned task so teardown is a single barrier.stop call.
271 272 273 |
# File 'lib/nnq/engine.rb', line 271 def spawn_task(annotation:, parent: @lifecycle., &block) parent.async(annotation: annotation, &block) end |
#transport_for(endpoint) ⇒ Object
Public so Reconnect can dial directly without re-deriving the transport from the URL each iteration.
236 237 238 239 |
# File 'lib/nnq/engine.rb', line 236 def transport_for(endpoint) scheme = endpoint[/\A([a-z+]+):\/\//i, 1] or raise Error, "no scheme: #{endpoint}" TRANSPORTS[scheme] or raise Error, "unsupported transport: #{scheme}" end |