Class: NNQ::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/engine.rb,
lib/nnq/engine/reconnect.rb,
lib/nnq/engine/socket_lifecycle.rb,
lib/nnq/engine/connection_lifecycle.rb

Overview

Per-socket orchestrator. Owns the listener set, the connection map (keyed on NNQ::Connection, with per-connection ConnectionLifecycle values), the transport registry, and the socket-level state machine via SocketLifecycle.

Mirrors OMQ’s Engine in shape but is much smaller because there’s no HWM bookkeeping, no mechanisms, no heartbeat, no monitor queue.

Defined Under Namespace

Classes: ConnectionLifecycle, Reconnect, SocketLifecycle

Constant Summary collapse

TRANSPORTS =
{
  "tcp"    => Transport::TCP,
  "ipc"    => Transport::IPC,
  "inproc" => Transport::Inproc,
}
CONNECTION_FAILED =

Connection errors that should trigger a reconnect retry rather than propagate. Mutable at load time so plugins (e.g. a future TLS transport) can append their own error classes; frozen on first #connect.

[
  Errno::ECONNREFUSED,
  Errno::EHOSTUNREACH,
  Errno::ENETUNREACH,
  Errno::ENOENT,
  Errno::EPIPE,
  Errno::ETIMEDOUT,
  Socket::ResolutionError,
]
CONNECTION_LOST =

Errors that indicate an established connection went away. Used by the recv loop and pumps to silently terminate (the connection lifecycle’s #lost! handler decides whether to reconnect).

[
  EOFError,
  IOError,
  Errno::ECONNRESET,
  Errno::EPIPE,
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(protocol:, options:) {|engine| ... } ⇒ Engine

Returns a new instance of Engine.

Parameters:

  • protocol (Integer)

    our SP protocol id (e.g. Protocols::PUSH_V0)

  • options (Options)

Yield Parameters:

  • engine (Engine)

    used by the caller to build a routing strategy with access to the engine’s connection map



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/nnq/engine.rb', line 83

def initialize(protocol:, options:)
  @protocol        = protocol
  @options         = options
  @connections     = {}
  @listeners       = []
  @lifecycle       = SocketLifecycle.new
  @last_endpoint   = nil
  @new_pipe        = Async::Condition.new
  @monitor_queue   = nil
  @verbose_monitor = false
  @dialed          = Set.new
  @routing         = yield(self)
end

Instance Attribute Details

#connectionsHash{NNQ::Connection => ConnectionLifecycle} (readonly)

Returns:



48
49
50
# File 'lib/nnq/engine.rb', line 48

def connections
  @connections
end

#dialedSet<String> (readonly)

Returns endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.

Returns:

  • (Set<String>)

    endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.



66
67
68
# File 'lib/nnq/engine.rb', line 66

def dialed
  @dialed
end

#last_endpointString? (readonly)

Returns:

  • (String, nil)


56
57
58
# File 'lib/nnq/engine.rb', line 56

def last_endpoint
  @last_endpoint
end

#lifecycleSocketLifecycle (readonly)

Returns:



52
53
54
# File 'lib/nnq/engine.rb', line 52

def lifecycle
  @lifecycle
end

#monitor_queueAsync::Queue?

Returns monitor event queue (set by Socket#monitor).

Returns:

  • (Async::Queue, nil)

    monitor event queue (set by Socket#monitor)



70
71
72
# File 'lib/nnq/engine.rb', line 70

def monitor_queue
  @monitor_queue
end

#new_pipeAsync::Condition (readonly)

Returns signaled when a new pipe is registered.

Returns:

  • (Async::Condition)

    signaled when a new pipe is registered



60
61
62
# File 'lib/nnq/engine.rb', line 60

def new_pipe
  @new_pipe
end

#optionsOptions (readonly)

Returns:



40
41
42
# File 'lib/nnq/engine.rb', line 40

def options
  @options
end

#protocolInteger (readonly)

Returns our SP protocol id (e.g. Protocols::PUSH_V0).

Returns:

  • (Integer)

    our SP protocol id (e.g. Protocols::PUSH_V0)



36
37
38
# File 'lib/nnq/engine.rb', line 36

def protocol
  @protocol
end

#routingRouting strategy (readonly)

Returns:



44
45
46
# File 'lib/nnq/engine.rb', line 44

def routing
  @routing
end

#verbose_monitorBoolean

Returns when true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue. Set by Socket#monitor via its verbose: kwarg.

Returns:

  • (Boolean)

    when true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue. Set by Socket#monitor via its verbose: kwarg.



76
77
78
# File 'lib/nnq/engine.rb', line 76

def verbose_monitor
  @verbose_monitor
end

Instance Method Details

#all_peers_goneAsync::Promise

Returns resolves when all peers have disconnected (edge-triggered, after at least one peer connected).

Returns:

  • (Async::Promise)

    resolves when all peers have disconnected (edge-triggered, after at least one peer connected)



148
149
150
# File 'lib/nnq/engine.rb', line 148

def all_peers_gone
  @lifecycle.all_peers_gone
end

#barrierAsync::Barrier?

Returns:

  • (Async::Barrier, nil)


130
131
132
# File 'lib/nnq/engine.rb', line 130

def barrier
  @lifecycle.barrier
end

#bind(endpoint) ⇒ Object

Binds to endpoint. Synchronous: errors propagate.



190
191
192
193
194
195
196
197
198
199
# File 'lib/nnq/engine.rb', line 190

def bind(endpoint)
  transport = transport_for(endpoint)
  listener  = transport.bind(endpoint, self)
  listener.start_accept_loop(@lifecycle.barrier) do |io, framing = :tcp|
    handle_accepted(io, endpoint: endpoint, framing: framing)
  end
  @listeners << listener
  @last_endpoint = listener.endpoint
  emit_monitor_event(:listening, endpoint: @last_endpoint)
end

#capture_parent_task(task, on_io_thread:) ⇒ Object

Stores the parent Async task that long-lived NNQ fibers will attach to. The caller (Socket) is responsible for picking the right one (the user’s current task, or Reactor.root_task).



184
185
186
# File 'lib/nnq/engine.rb', line 184

def capture_parent_task(task, on_io_thread:)
  @lifecycle.capture_parent_task(task, on_io_thread: on_io_thread)
end

#closeObject

Closes the engine: stops listeners, drains the send queue subject to linger, stops routing pumps (which by now are parked on the empty queue), then tears down every connection’s lifecycle. Order matters — closing connections first would force mid-flush pumps to abort with IOError.



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/nnq/engine.rb', line 277

def close
  return unless @lifecycle.alive?

  @lifecycle.start_closing!
  @listeners.each(&:stop)
  drain_send_queue(@options.linger)
  @routing.close if @routing.respond_to?(:close)

  # Tear down each remaining connection via its lifecycle. The
  # collection mutates during iteration, so snapshot the values.
  @connections.values.each(&:close!)

  # Cascade-cancel every remaining task (reconnect loops, accept
  # loops, supervisors) in one shot.
  @lifecycle.barrier&.stop
  @lifecycle.finish_closing!
  @new_pipe.signal

  # Unblock anyone waiting on peer_connected when the socket is
  # closed before a peer ever arrived.
  @lifecycle.peer_connected.resolve(nil) unless @lifecycle.peer_connected.resolved?
  emit_monitor_event(:closed)
  close_monitor_queue
end

#close_readObject

Closes only the recv side. Buffered messages drain, then Socket#receive returns nil. Send side remains operational.



176
177
178
# File 'lib/nnq/engine.rb', line 176

def close_read
  @routing.close_read if @routing.respond_to?(:close_read)
end

#closed?Boolean

Returns:

  • (Boolean)


135
136
137
# File 'lib/nnq/engine.rb', line 135

def closed?
  @lifecycle.closed?
end

#connect(endpoint) ⇒ Object

Connects to endpoint. Non-blocking for tcp:// and ipc:// — the actual dial happens inside a background reconnect task that retries with exponential back-off until the peer becomes reachable. Inproc connect is synchronous and instant.



206
207
208
209
210
211
212
213
214
215
216
# File 'lib/nnq/engine.rb', line 206

def connect(endpoint)
  @dialed << endpoint
  @last_endpoint = endpoint

  if endpoint.start_with?("inproc://")
    transport_for(endpoint).connect(endpoint, self)
  else
    emit_monitor_event(:connect_delayed, endpoint: endpoint)
    Reconnect.schedule(endpoint, @options, @lifecycle.barrier, self, delay: 0)
  end
end

#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object

Emits a monitor event to the attached queue (if any).



99
100
101
102
103
# File 'lib/nnq/engine.rb', line 99

def emit_monitor_event(type, endpoint: nil, detail: nil)
  return unless @monitor_queue
  @monitor_queue.enqueue(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail))
rescue Async::Stop
end

#emit_verbose_msg_received(body) ⇒ Object

Emits a :message_received verbose event. Same early-return discipline as #emit_verbose_msg_sent.



117
118
119
120
# File 'lib/nnq/engine.rb', line 117

def emit_verbose_msg_received(body)
  return unless @verbose_monitor
  emit_monitor_event(:message_received, detail: { body: body })
end

#emit_verbose_msg_sent(body) ⇒ Object

Emits a :message_sent verbose event. Early-returns before allocating the detail hash so the hot send path pays nothing when verbose monitoring is off.



109
110
111
112
# File 'lib/nnq/engine.rb', line 109

def emit_verbose_msg_sent(body)
  return unless @verbose_monitor
  emit_monitor_event(:message_sent, detail: { body: body })
end

#handle_accepted(io, endpoint:, framing: :tcp) ⇒ Object

Called by transports for each accepted client connection.



239
240
241
242
243
244
245
246
247
248
# File 'lib/nnq/engine.rb', line 239

def handle_accepted(io, endpoint:, framing: :tcp)
  lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing)
  lifecycle.handshake!(io)
  spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn)
  lifecycle.start_supervisor!
rescue ConnectionRejected
  # routing rejected this peer (e.g. PAIR already bonded) — lifecycle cleaned up
rescue => e
  warn("nnq: handshake failed for #{endpoint}: #{e.class}: #{e.message}") if $DEBUG
end

#handle_connected(io, endpoint:, framing: :tcp) ⇒ Object

Called by transports for each dialed connection.



252
253
254
255
256
257
258
259
# File 'lib/nnq/engine.rb', line 252

def handle_connected(io, endpoint:, framing: :tcp)
  lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing)
  lifecycle.handshake!(io)
  spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn)
  lifecycle.start_supervisor!
rescue ConnectionRejected
  # unusual on connect side, but handled identically
end

#handle_connection_lost(conn) ⇒ Object

Called by routing pumps (or the recv loop) when their connection has died. Idempotent via the lifecycle state guard.



305
306
307
# File 'lib/nnq/engine.rb', line 305

def handle_connection_lost(conn)
  @connections[conn]&.lost!
end

#maybe_reconnect(endpoint) ⇒ Object

Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still in the dialed set. Called from the connection lifecycle’s ‘lost!` path.



222
223
224
225
226
227
# File 'lib/nnq/engine.rb', line 222

def maybe_reconnect(endpoint)
  return unless endpoint && @dialed.include?(endpoint)
  return unless @lifecycle.alive? && @lifecycle.reconnect_enabled
  return if endpoint.start_with?("inproc://")
  Reconnect.schedule(endpoint, @options, @lifecycle.barrier, self)
end

#parent_taskAsync::Task?

Returns:

  • (Async::Task, nil)


124
125
126
# File 'lib/nnq/engine.rb', line 124

def parent_task
  @lifecycle.parent_task
end

#peer_connectedAsync::Promise

Returns resolves with the first connected peer.

Returns:

  • (Async::Promise)

    resolves with the first connected peer



141
142
143
# File 'lib/nnq/engine.rb', line 141

def peer_connected
  @lifecycle.peer_connected
end

#reconnect_enabledBoolean

Returns:

  • (Boolean)


161
162
163
# File 'lib/nnq/engine.rb', line 161

def reconnect_enabled
  @lifecycle.reconnect_enabled
end

#reconnect_enabled=(value) ⇒ Object

Disables or re-enables automatic reconnect. nnq has no reconnect loop yet, so this is forward-looking — TransientMonitor flips it before draining.



169
170
171
# File 'lib/nnq/engine.rb', line 169

def reconnect_enabled=(value)
  @lifecycle.reconnect_enabled = value
end

#resolve_all_peers_gone_if_emptyObject

Called by ConnectionLifecycle teardown. Resolves ‘all_peers_gone` if the connection set is now empty and we had peers.



155
156
157
# File 'lib/nnq/engine.rb', line 155

def resolve_all_peers_gone_if_empty
  @lifecycle.resolve_all_peers_gone_if_empty(@connections)
end

#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object

Spawns a task under the given parent barrier (defaults to the socket-level barrier). Used by routing strategies (e.g. PUSH send pump) to attach long-lived fibers to the engine’s lifecycle. The parent barrier tracks every spawned task so teardown is a single barrier.stop call.



267
268
269
# File 'lib/nnq/engine.rb', line 267

def spawn_task(annotation:, parent: @lifecycle.barrier, &block)
  parent.async(annotation: annotation, &block)
end

#transport_for(endpoint) ⇒ Object

Public so Reconnect can dial directly without re-deriving the transport from the URL each iteration.



232
233
234
235
# File 'lib/nnq/engine.rb', line 232

def transport_for(endpoint)
  scheme = endpoint[/\A([a-z+]+):\/\//i, 1] or raise Error, "no scheme: #{endpoint}"
  TRANSPORTS[scheme] or raise Error, "unsupported transport: #{scheme}"
end