Class: NNQ::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/engine.rb,
lib/nnq/engine/reconnect.rb,
lib/nnq/engine/socket_lifecycle.rb,
lib/nnq/engine/connection_lifecycle.rb

Overview

Per-socket orchestrator. Owns the listener set, the connection map (keyed on NNQ::Connection, with per-connection ConnectionLifecycle values), the transport registry, and the socket-level state machine via SocketLifecycle.

Mirrors OMQ’s Engine in shape but is much smaller because there’s no HWM bookkeeping, no mechanisms, no heartbeat, no monitor queue.

Defined Under Namespace

Classes: ConnectionLifecycle, Reconnect, SocketLifecycle

Constant Summary collapse

TRANSPORTS =
{
  "tcp"    => Transport::TCP,
  "ipc"    => Transport::IPC,
  "inproc" => Transport::Inproc,
}
CONNECTION_FAILED =

Connection errors that should trigger a reconnect retry rather than propagate. Mutable at load time so plugins (e.g. a future TLS transport) can append their own error classes; frozen on first #connect.

[
  Errno::ECONNREFUSED,
  Errno::EHOSTUNREACH,
  Errno::ENETUNREACH,
  Errno::ENOENT,
  Errno::EPIPE,
  Errno::ETIMEDOUT,
  Socket::ResolutionError,
]
CONNECTION_LOST =

Errors that indicate an established connection went away. Used by the recv loop and pumps to silently terminate (the connection lifecycle’s #lost! handler decides whether to reconnect).

[
  EOFError,
  IOError,
  Errno::ECONNRESET,
  Errno::EPIPE,
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(protocol:, options:) {|engine| ... } ⇒ Engine

Returns a new instance of Engine.

Parameters:

  • protocol (Integer)

    our SP protocol id (e.g. Protocols::PUSH_V0)

  • options (Options)

Yield Parameters:

  • engine (Engine)

    used by the caller to build a routing strategy with access to the engine’s connection map



87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/nnq/engine.rb', line 87

def initialize(protocol:, options:)
  @protocol        = protocol
  @options         = options
  @connections     = {}
  @listeners       = []
  @lifecycle       = SocketLifecycle.new
  @last_endpoint   = nil
  @new_pipe        = Async::Condition.new
  @monitor_queue   = nil
  @verbose_monitor = false
  @dialed          = Set.new
  @routing         = yield(self)
end

Instance Attribute Details

#connectionsHash{NNQ::Connection => ConnectionLifecycle} (readonly)

Returns:



48
49
50
# File 'lib/nnq/engine.rb', line 48

def connections
  @connections
end

#dialedSet<String> (readonly)

Returns endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.

Returns:

  • (Set<String>)

    endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.



66
67
68
# File 'lib/nnq/engine.rb', line 66

def dialed
  @dialed
end

#last_endpointString? (readonly)

Returns:

  • (String, nil)


56
57
58
# File 'lib/nnq/engine.rb', line 56

def last_endpoint
  @last_endpoint
end

#lifecycleSocketLifecycle (readonly)

Returns:



52
53
54
# File 'lib/nnq/engine.rb', line 52

def lifecycle
  @lifecycle
end

#monitor_queueAsync::Queue?

Returns monitor event queue (set by Socket#monitor).

Returns:

  • (Async::Queue, nil)

    monitor event queue (set by Socket#monitor)



70
71
72
# File 'lib/nnq/engine.rb', line 70

def monitor_queue
  @monitor_queue
end

#monitor_taskAsync::Task?

Returns the monitor consumer task, if any.

Returns:

  • (Async::Task, nil)

    the monitor consumer task, if any



74
75
76
# File 'lib/nnq/engine.rb', line 74

def monitor_task
  @monitor_task
end

#new_pipeAsync::Condition (readonly)

Returns signaled when a new pipe is registered.

Returns:

  • (Async::Condition)

    signaled when a new pipe is registered



60
61
62
# File 'lib/nnq/engine.rb', line 60

def new_pipe
  @new_pipe
end

#optionsOptions (readonly)

Returns:



40
41
42
# File 'lib/nnq/engine.rb', line 40

def options
  @options
end

#protocolInteger (readonly)

Returns our SP protocol id (e.g. Protocols::PUSH_V0).

Returns:

  • (Integer)

    our SP protocol id (e.g. Protocols::PUSH_V0)



36
37
38
# File 'lib/nnq/engine.rb', line 36

def protocol
  @protocol
end

#routingRouting strategy (readonly)

Returns:



44
45
46
# File 'lib/nnq/engine.rb', line 44

def routing
  @routing
end

#verbose_monitorBoolean

Returns when true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue. Set by Socket#monitor via its verbose: kwarg.

Returns:

  • (Boolean)

    when true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue. Set by Socket#monitor via its verbose: kwarg.



80
81
82
# File 'lib/nnq/engine.rb', line 80

def verbose_monitor
  @verbose_monitor
end

Instance Method Details

#all_peers_goneAsync::Promise

Returns resolves when all peers have disconnected (edge-triggered, after at least one peer connected).

Returns:

  • (Async::Promise)

    resolves when all peers have disconnected (edge-triggered, after at least one peer connected)



152
153
154
# File 'lib/nnq/engine.rb', line 152

def all_peers_gone
  @lifecycle.all_peers_gone
end

#barrierAsync::Barrier?

Returns:

  • (Async::Barrier, nil)


134
135
136
# File 'lib/nnq/engine.rb', line 134

def barrier
  @lifecycle.barrier
end

#bind(endpoint) ⇒ Object

Binds to endpoint. Synchronous: errors propagate.



194
195
196
197
198
199
200
201
202
203
# File 'lib/nnq/engine.rb', line 194

def bind(endpoint)
  transport = transport_for(endpoint)
  listener  = transport.bind(endpoint, self)
  listener.start_accept_loop(@lifecycle.barrier) do |io, framing = :tcp|
    handle_accepted(io, endpoint: endpoint, framing: framing)
  end
  @listeners << listener
  @last_endpoint = listener.endpoint
  emit_monitor_event(:listening, endpoint: @last_endpoint)
end

#capture_parent_task(task, on_io_thread:) ⇒ Object

Stores the parent Async task that long-lived NNQ fibers will attach to. The caller (Socket) is responsible for picking the right one (the user’s current task, or Reactor.root_task).



188
189
190
# File 'lib/nnq/engine.rb', line 188

def capture_parent_task(task, on_io_thread:)
  @lifecycle.capture_parent_task(task, on_io_thread: on_io_thread)
end

#closeObject

Closes the engine: stops listeners, drains the send queue subject to linger, stops routing pumps (which by now are parked on the empty queue), then tears down every connection’s lifecycle. Order matters — closing connections first would force mid-flush pumps to abort with IOError.



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/nnq/engine.rb', line 281

def close
  return unless @lifecycle.alive?

  @lifecycle.start_closing!
  @listeners.each(&:stop)
  drain_send_queue(@options.linger)
  @routing.close if @routing.respond_to?(:close)

  # Tear down each remaining connection via its lifecycle. The
  # collection mutates during iteration, so snapshot the values.
  @connections.values.each(&:close!)

  # Emit :closed, seal the monitor queue, and wait for the monitor
  # fiber to drain it before cancelling tasks. Without this join,
  # trailing :message_received events that the recv pump enqueued
  # just before close would be lost when the barrier.stop below
  # Async::Stops the monitor fiber mid-dequeue.
  emit_monitor_event(:closed)
  close_monitor_queue
  @monitor_task&.wait

  # Cascade-cancel every remaining task (reconnect loops, accept
  # loops, supervisors) in one shot.
  @lifecycle.barrier&.stop
  @lifecycle.finish_closing!
  @new_pipe.signal

  # Unblock anyone waiting on peer_connected when the socket is
  # closed before a peer ever arrived.
  @lifecycle.peer_connected.resolve(nil) unless @lifecycle.peer_connected.resolved?
end

#close_readObject

Closes only the recv side. Buffered messages drain, then Socket#receive returns nil. Send side remains operational.



180
181
182
# File 'lib/nnq/engine.rb', line 180

def close_read
  @routing.close_read if @routing.respond_to?(:close_read)
end

#closed?Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/nnq/engine.rb', line 139

def closed?
  @lifecycle.closed?
end

#connect(endpoint) ⇒ Object

Connects to endpoint. Non-blocking for tcp:// and ipc:// — the actual dial happens inside a background reconnect task that retries with exponential back-off until the peer becomes reachable. Inproc connect is synchronous and instant.



210
211
212
213
214
215
216
217
218
219
220
# File 'lib/nnq/engine.rb', line 210

def connect(endpoint)
  @dialed << endpoint
  @last_endpoint = endpoint

  if endpoint.start_with?("inproc://")
    transport_for(endpoint).connect(endpoint, self)
  else
    emit_monitor_event(:connect_delayed, endpoint: endpoint)
    Reconnect.schedule(endpoint, @options, @lifecycle.barrier, self, delay: 0)
  end
end

#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object

Emits a monitor event to the attached queue (if any).



103
104
105
106
107
# File 'lib/nnq/engine.rb', line 103

def emit_monitor_event(type, endpoint: nil, detail: nil)
  return unless @monitor_queue
  @monitor_queue.enqueue(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail))
rescue Async::Stop
end

#emit_verbose_msg_received(body) ⇒ Object

Emits a :message_received verbose event. Same early-return discipline as #emit_verbose_msg_sent.



121
122
123
124
# File 'lib/nnq/engine.rb', line 121

def emit_verbose_msg_received(body)
  return unless @verbose_monitor
  emit_monitor_event(:message_received, detail: { body: body })
end

#emit_verbose_msg_sent(body) ⇒ Object

Emits a :message_sent verbose event. Early-returns before allocating the detail hash so the hot send path pays nothing when verbose monitoring is off.



113
114
115
116
# File 'lib/nnq/engine.rb', line 113

def emit_verbose_msg_sent(body)
  return unless @verbose_monitor
  emit_monitor_event(:message_sent, detail: { body: body })
end

#handle_accepted(io, endpoint:, framing: :tcp) ⇒ Object

Called by transports for each accepted client connection.



243
244
245
246
247
248
249
250
251
252
# File 'lib/nnq/engine.rb', line 243

def handle_accepted(io, endpoint:, framing: :tcp)
  lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing)
  lifecycle.handshake!(io)
  spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn)
  lifecycle.start_supervisor!
rescue ConnectionRejected
  # routing rejected this peer (e.g. PAIR already bonded) — lifecycle cleaned up
rescue => e
  warn("nnq: handshake failed for #{endpoint}: #{e.class}: #{e.message}") if $DEBUG
end

#handle_connected(io, endpoint:, framing: :tcp) ⇒ Object

Called by transports for each dialed connection.



256
257
258
259
260
261
262
263
# File 'lib/nnq/engine.rb', line 256

def handle_connected(io, endpoint:, framing: :tcp)
  lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing)
  lifecycle.handshake!(io)
  spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn)
  lifecycle.start_supervisor!
rescue ConnectionRejected
  # unusual on connect side, but handled identically
end

#handle_connection_lost(conn) ⇒ Object

Called by routing pumps (or the recv loop) when their connection has died. Idempotent via the lifecycle state guard.



316
317
318
# File 'lib/nnq/engine.rb', line 316

def handle_connection_lost(conn)
  @connections[conn]&.lost!
end

#maybe_reconnect(endpoint) ⇒ Object

Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still in the dialed set. Called from the connection lifecycle’s ‘lost!` path.



226
227
228
229
230
231
# File 'lib/nnq/engine.rb', line 226

def maybe_reconnect(endpoint)
  return unless endpoint && @dialed.include?(endpoint)
  return unless @lifecycle.alive? && @lifecycle.reconnect_enabled
  return if endpoint.start_with?("inproc://")
  Reconnect.schedule(endpoint, @options, @lifecycle.barrier, self)
end

#parent_taskAsync::Task?

Returns:

  • (Async::Task, nil)


128
129
130
# File 'lib/nnq/engine.rb', line 128

def parent_task
  @lifecycle.parent_task
end

#peer_connectedAsync::Promise

Returns resolves with the first connected peer.

Returns:

  • (Async::Promise)

    resolves with the first connected peer



145
146
147
# File 'lib/nnq/engine.rb', line 145

def peer_connected
  @lifecycle.peer_connected
end

#reconnect_enabledBoolean

Returns:

  • (Boolean)


165
166
167
# File 'lib/nnq/engine.rb', line 165

def reconnect_enabled
  @lifecycle.reconnect_enabled
end

#reconnect_enabled=(value) ⇒ Object

Disables or re-enables automatic reconnect. nnq has no reconnect loop yet, so this is forward-looking — TransientMonitor flips it before draining.



173
174
175
# File 'lib/nnq/engine.rb', line 173

def reconnect_enabled=(value)
  @lifecycle.reconnect_enabled = value
end

#resolve_all_peers_gone_if_emptyObject

Called by ConnectionLifecycle teardown. Resolves ‘all_peers_gone` if the connection set is now empty and we had peers.



159
160
161
# File 'lib/nnq/engine.rb', line 159

def resolve_all_peers_gone_if_empty
  @lifecycle.resolve_all_peers_gone_if_empty(@connections)
end

#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object

Spawns a task under the given parent barrier (defaults to the socket-level barrier). Used by routing strategies (e.g. PUSH send pump) to attach long-lived fibers to the engine’s lifecycle. The parent barrier tracks every spawned task so teardown is a single barrier.stop call.



271
272
273
# File 'lib/nnq/engine.rb', line 271

def spawn_task(annotation:, parent: @lifecycle.barrier, &block)
  parent.async(annotation: annotation, &block)
end

#transport_for(endpoint) ⇒ Object

Public so Reconnect can dial directly without re-deriving the transport from the URL each iteration.



236
237
238
239
# File 'lib/nnq/engine.rb', line 236

def transport_for(endpoint)
  scheme = endpoint[/\A([a-z+]+):\/\//i, 1] or raise Error, "no scheme: #{endpoint}"
  TRANSPORTS[scheme] or raise Error, "unsupported transport: #{scheme}"
end