Class: NNQ::Engine
- Inherits:
-
Object
- Object
- NNQ::Engine
- Defined in:
- lib/nnq/engine.rb,
lib/nnq/engine/reconnect.rb,
lib/nnq/engine/socket_lifecycle.rb,
lib/nnq/engine/connection_lifecycle.rb
Overview
Per-socket orchestrator. Owns the listener set, the connection map (keyed on NNQ::Connection, with per-connection ConnectionLifecycle values), the transport registry, and the socket-level state machine via SocketLifecycle.
Mirrors OMQ’s Engine in shape but is much smaller because there’s no HWM bookkeeping, no mechanisms, no heartbeat, no monitor queue.
Defined Under Namespace
Classes: ConnectionLifecycle, Reconnect, SocketLifecycle
Class Attribute Summary collapse
-
.transports ⇒ Hash{String => Module}
readonly
Registered transports.
Instance Attribute Summary collapse
- #connections ⇒ Hash{NNQ::Connection => ConnectionLifecycle} readonly
-
#dialed ⇒ Set<String>
readonly
Endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.
- #last_endpoint ⇒ String? readonly
- #lifecycle ⇒ SocketLifecycle readonly
-
#monitor_queue ⇒ Async::Queue?
Monitor event queue (set by Socket#monitor).
-
#monitor_task ⇒ Async::Task?
The monitor consumer task, if any.
-
#new_pipe ⇒ Async::Condition
readonly
Signaled when a new pipe is registered.
- #options ⇒ Options readonly
-
#protocol ⇒ Integer
readonly
Our SP protocol id (e.g. Protocols::PUSH_V0).
- #routing ⇒ Routing strategy readonly
-
#verbose_monitor ⇒ Boolean
When true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue.
Instance Method Summary collapse
-
#all_peers_gone ⇒ Async::Promise
Resolves when all peers have disconnected (edge-triggered, after at least one peer connected).
- #barrier ⇒ Async::Barrier?
-
#bind(endpoint, **opts) ⇒ Object
Binds to
endpoint. -
#capture_parent_task(task, on_io_thread:) ⇒ Object
Stores the parent Async task that long-lived NNQ fibers will attach to.
-
#close ⇒ Object
Closes the engine: stops listeners, drains the send queue subject to linger, stops routing pumps (which by now are parked on the empty queue), then tears down every connection’s lifecycle.
-
#close_read ⇒ Object
Closes only the recv side.
- #closed? ⇒ Boolean
-
#connect(endpoint, **opts) ⇒ Object
Connects to
endpoint. -
#connection_ready(conn, endpoint:) ⇒ Object
Registers an already-connected, framing-free pipe (inproc).
-
#dial_opts_for(endpoint) ⇒ Object
Transport options captured from #connect for
endpoint. -
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object
Emits a monitor event to the attached queue (if any).
-
#emit_verbose_msg_received(body) ⇒ Object
Emits a :message_received verbose event.
-
#emit_verbose_msg_sent(body) ⇒ Object
Emits a :message_sent verbose event.
-
#handle_accepted(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each accepted client connection.
-
#handle_connected(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each dialed connection.
-
#handle_connection_lost(conn) ⇒ Object
Called by routing pumps (or the recv loop) when their connection has died.
-
#initialize(protocol:, options:) {|engine| ... } ⇒ Engine
constructor
A new instance of Engine.
-
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for
endpointif auto-reconnect is enabled and the endpoint is still in the dialed set. - #parent_task ⇒ Async::Task?
-
#peer_connected ⇒ Async::Promise
Resolves with the first connected peer.
- #reconnect_enabled ⇒ Boolean
-
#reconnect_enabled=(value) ⇒ Object
Disables or re-enables automatic reconnect.
-
#resolve_all_peers_gone_if_empty ⇒ Object
Called by ConnectionLifecycle teardown.
-
#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object
Spawns a task under the given parent barrier (defaults to the socket-level barrier).
-
#transport_for(endpoint) ⇒ Object
Public so Reconnect can dial directly without re-deriving the transport from the URL each iteration.
Constructor Details
#initialize(protocol:, options:) {|engine| ... } ⇒ Engine
Returns a new instance of Engine.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/nnq/engine.rb', line 87 def initialize(protocol:, options:) @protocol = protocol @options = @connections = {} @listeners = [] @lifecycle = SocketLifecycle.new @last_endpoint = nil @new_pipe = Async::Condition.new @monitor_queue = nil @verbose_monitor = false @dialed = Set.new @dial_opts = {} # endpoint => kwargs for transport.connect on reconnect @routing = yield(self) end |
Class Attribute Details
.transports ⇒ Hash{String => Module} (readonly)
Returns registered transports.
31 32 33 |
# File 'lib/nnq/engine.rb', line 31 def transports @transports end |
Instance Attribute Details
#connections ⇒ Hash{NNQ::Connection => ConnectionLifecycle} (readonly)
48 49 50 |
# File 'lib/nnq/engine.rb', line 48 def connections @connections end |
#dialed ⇒ Set<String> (readonly)
Returns endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.
66 67 68 |
# File 'lib/nnq/engine.rb', line 66 def dialed @dialed end |
#last_endpoint ⇒ String? (readonly)
56 57 58 |
# File 'lib/nnq/engine.rb', line 56 def last_endpoint @last_endpoint end |
#lifecycle ⇒ SocketLifecycle (readonly)
52 53 54 |
# File 'lib/nnq/engine.rb', line 52 def lifecycle @lifecycle end |
#monitor_queue ⇒ Async::Queue?
Returns monitor event queue (set by Socket#monitor).
70 71 72 |
# File 'lib/nnq/engine.rb', line 70 def monitor_queue @monitor_queue end |
#monitor_task ⇒ Async::Task?
Returns the monitor consumer task, if any.
74 75 76 |
# File 'lib/nnq/engine.rb', line 74 def monitor_task @monitor_task end |
#new_pipe ⇒ Async::Condition (readonly)
Returns signaled when a new pipe is registered.
60 61 62 |
# File 'lib/nnq/engine.rb', line 60 def new_pipe @new_pipe end |
#protocol ⇒ Integer (readonly)
Returns our SP protocol id (e.g. Protocols::PUSH_V0).
36 37 38 |
# File 'lib/nnq/engine.rb', line 36 def protocol @protocol end |
#routing ⇒ Routing strategy (readonly)
44 45 46 |
# File 'lib/nnq/engine.rb', line 44 def routing @routing end |
#verbose_monitor ⇒ Boolean
Returns when true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue. Set by Socket#monitor via its verbose: kwarg.
80 81 82 |
# File 'lib/nnq/engine.rb', line 80 def verbose_monitor @verbose_monitor end |
Instance Method Details
#all_peers_gone ⇒ Async::Promise
Returns resolves when all peers have disconnected (edge-triggered, after at least one peer connected).
153 154 155 |
# File 'lib/nnq/engine.rb', line 153 def all_peers_gone @lifecycle.all_peers_gone end |
#barrier ⇒ Async::Barrier?
135 136 137 |
# File 'lib/nnq/engine.rb', line 135 def @lifecycle. end |
#bind(endpoint, **opts) ⇒ Object
Binds to endpoint. Synchronous: errors propagate.
195 196 197 198 199 200 201 202 203 204 |
# File 'lib/nnq/engine.rb', line 195 def bind(endpoint, **opts) transport = transport_for(endpoint) listener = transport.bind(endpoint, self, **opts) listener.start_accept_loop(@lifecycle.) do |io, framing = :tcp| handle_accepted(io, endpoint: endpoint, framing: framing) end @listeners << listener @last_endpoint = listener.endpoint emit_monitor_event(:listening, endpoint: @last_endpoint) end |
#capture_parent_task(task, on_io_thread:) ⇒ Object
Stores the parent Async task that long-lived NNQ fibers will attach to. The caller (Socket) is responsible for picking the right one (the user’s current task, or Reactor.root_task).
189 190 191 |
# File 'lib/nnq/engine.rb', line 189 def capture_parent_task(task, on_io_thread:) @lifecycle.capture_parent_task(task, on_io_thread: on_io_thread) end |
#close ⇒ Object
Closes the engine: stops listeners, drains the send queue subject to linger, stops routing pumps (which by now are parked on the empty queue), then tears down every connection’s lifecycle. Order matters — closing connections first would force mid-flush pumps to abort with IOError.
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/nnq/engine.rb', line 304 def close return unless @lifecycle.alive? @lifecycle.start_closing! @listeners.each(&:stop) drain_send_queue(@options.linger) @routing.close if @routing.respond_to?(:close) # Tear down each remaining connection via its lifecycle. The # collection mutates during iteration, so snapshot the values. @connections.values.each(&:close!) # Emit :closed, seal the monitor queue, and wait for the monitor # fiber to drain it before cancelling tasks. Without this join, # trailing :message_received events that the recv pump enqueued # just before close would be lost when the barrier.stop below # Async::Stops the monitor fiber mid-dequeue. emit_monitor_event(:closed) close_monitor_queue @monitor_task&.wait # Cascade-cancel every remaining task (reconnect loops, accept # loops, supervisors) in one shot. @lifecycle.&.stop @lifecycle.finish_closing! @new_pipe.signal # Unblock anyone waiting on peer_connected when the socket is # closed before a peer ever arrived. @lifecycle.peer_connected.resolve(nil) unless @lifecycle.peer_connected.resolved? end |
#close_read ⇒ Object
Closes only the recv side. Buffered messages drain, then Socket#receive returns nil. Send side remains operational.
181 182 183 |
# File 'lib/nnq/engine.rb', line 181 def close_read @routing.close_read if @routing.respond_to?(:close_read) end |
#closed? ⇒ Boolean
140 141 142 |
# File 'lib/nnq/engine.rb', line 140 def closed? @lifecycle.closed? end |
#connect(endpoint, **opts) ⇒ Object
Connects to endpoint. Non-blocking for tcp:// and ipc:// — the actual dial happens inside a background reconnect task that retries with exponential back-off until the peer becomes reachable. Inproc connect is synchronous and instant.
211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/nnq/engine.rb', line 211 def connect(endpoint, **opts) @dialed << endpoint @dial_opts[endpoint] = opts unless opts.empty? @last_endpoint = endpoint if endpoint.start_with?("inproc://") transport_for(endpoint).connect(endpoint, self, **opts) else emit_monitor_event(:connect_delayed, endpoint: endpoint) Reconnect.schedule(endpoint, @options, @lifecycle., self, delay: 0) end end |
#connection_ready(conn, endpoint:) ⇒ Object
Registers an already-connected, framing-free pipe (inproc). Skips the SP handshake entirely — Transport::Inproc::Pipe is a Ruby duck-type for Connection and has no wire protocol.
279 280 281 282 283 284 285 286 |
# File 'lib/nnq/engine.rb', line 279 def connection_ready(conn, endpoint:) lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: :inproc) lifecycle.ready_direct!(conn) spawn_recv_loop(conn) if @routing.respond_to?(:enqueue) && @connections.key?(conn) lifecycle.start_supervisor! rescue ConnectionRejected # routing rejected this peer (e.g. PAIR already bonded) end |
#dial_opts_for(endpoint) ⇒ Object
228 229 230 |
# File 'lib/nnq/engine.rb', line 228 def dial_opts_for(endpoint) @dial_opts[endpoint] || {} end |
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object
Emits a monitor event to the attached queue (if any).
104 105 106 107 108 |
# File 'lib/nnq/engine.rb', line 104 def emit_monitor_event(type, endpoint: nil, detail: nil) return unless @monitor_queue @monitor_queue.enqueue(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail)) rescue Async::Stop end |
#emit_verbose_msg_received(body) ⇒ Object
Emits a :message_received verbose event. Same early-return discipline as #emit_verbose_msg_sent.
122 123 124 125 |
# File 'lib/nnq/engine.rb', line 122 def emit_verbose_msg_received(body) return unless @verbose_monitor emit_monitor_event(:message_received, detail: { body: body }) end |
#emit_verbose_msg_sent(body) ⇒ Object
Emits a :message_sent verbose event. Early-returns before allocating the detail hash so the hot send path pays nothing when verbose monitoring is off.
114 115 116 117 |
# File 'lib/nnq/engine.rb', line 114 def emit_verbose_msg_sent(body) return unless @verbose_monitor emit_monitor_event(:message_sent, detail: { body: body }) end |
#handle_accepted(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each accepted client connection.
253 254 255 256 257 258 259 260 261 262 |
# File 'lib/nnq/engine.rb', line 253 def handle_accepted(io, endpoint:, framing: :tcp) lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing) lifecycle.handshake!(io) spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn) lifecycle.start_supervisor! rescue ConnectionRejected # routing rejected this peer (e.g. PAIR already bonded) — lifecycle cleaned up rescue => e warn("nnq: handshake failed for #{endpoint}: #{e.class}: #{e.}") if $DEBUG end |
#handle_connected(io, endpoint:, framing: :tcp) ⇒ Object
Called by transports for each dialed connection.
266 267 268 269 270 271 272 273 |
# File 'lib/nnq/engine.rb', line 266 def handle_connected(io, endpoint:, framing: :tcp) lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing) lifecycle.handshake!(io) spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn) lifecycle.start_supervisor! rescue ConnectionRejected # unusual on connect side, but handled identically end |
#handle_connection_lost(conn) ⇒ Object
Called by routing pumps (or the recv loop) when their connection has died. Idempotent via the lifecycle state guard.
339 340 341 |
# File 'lib/nnq/engine.rb', line 339 def handle_connection_lost(conn) @connections[conn]&.lost! end |
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still in the dialed set. Called from the connection lifecycle’s ‘lost!` path.
236 237 238 239 240 241 |
# File 'lib/nnq/engine.rb', line 236 def maybe_reconnect(endpoint) return unless endpoint && @dialed.include?(endpoint) return unless @lifecycle.alive? && @lifecycle.reconnect_enabled return if endpoint.start_with?("inproc://") Reconnect.schedule(endpoint, @options, @lifecycle., self) end |
#parent_task ⇒ Async::Task?
129 130 131 |
# File 'lib/nnq/engine.rb', line 129 def parent_task @lifecycle.parent_task end |
#peer_connected ⇒ Async::Promise
Returns resolves with the first connected peer.
146 147 148 |
# File 'lib/nnq/engine.rb', line 146 def peer_connected @lifecycle.peer_connected end |
#reconnect_enabled ⇒ Boolean
166 167 168 |
# File 'lib/nnq/engine.rb', line 166 def reconnect_enabled @lifecycle.reconnect_enabled end |
#reconnect_enabled=(value) ⇒ Object
Disables or re-enables automatic reconnect. nnq has no reconnect loop yet, so this is forward-looking — TransientMonitor flips it before draining.
174 175 176 |
# File 'lib/nnq/engine.rb', line 174 def reconnect_enabled=(value) @lifecycle.reconnect_enabled = value end |
#resolve_all_peers_gone_if_empty ⇒ Object
Called by ConnectionLifecycle teardown. Resolves ‘all_peers_gone` if the connection set is now empty and we had peers.
160 161 162 |
# File 'lib/nnq/engine.rb', line 160 def resolve_all_peers_gone_if_empty @lifecycle.resolve_all_peers_gone_if_empty(@connections) end |
#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object
Spawns a task under the given parent barrier (defaults to the socket-level barrier). Used by routing strategies (e.g. PUSH send pump) to attach long-lived fibers to the engine’s lifecycle. The parent barrier tracks every spawned task so teardown is a single barrier.stop call.
294 295 296 |
# File 'lib/nnq/engine.rb', line 294 def spawn_task(annotation:, parent: @lifecycle., &block) parent.async(annotation: annotation, &block) end |
#transport_for(endpoint) ⇒ Object
Public so Reconnect can dial directly without re-deriving the transport from the URL each iteration.
246 247 248 249 |
# File 'lib/nnq/engine.rb', line 246 def transport_for(endpoint) scheme = endpoint[/\A([a-z+]+):\/\//i, 1] or raise Error, "no scheme: #{endpoint}" Engine.transports[scheme] or raise Error, "unsupported transport: #{scheme}" end |