Class: NNQ::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/engine.rb,
lib/nnq/engine/reconnect.rb,
lib/nnq/engine/socket_lifecycle.rb,
lib/nnq/engine/connection_lifecycle.rb

Overview

Per-socket orchestrator. Owns the listener set, the connection map (keyed on NNQ::Connection, with per-connection ConnectionLifecycle values), the transport registry, and the socket-level state machine via SocketLifecycle.

Mirrors OMQ’s Engine in shape but is much smaller because there’s no HWM bookkeeping, no mechanisms, no heartbeat, no monitor queue.

Defined Under Namespace

Classes: ConnectionLifecycle, Reconnect, SocketLifecycle

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(protocol:, options:) {|engine| ... } ⇒ Engine

Returns a new instance of Engine.

Parameters:

  • protocol (Integer)

    our SP protocol id (e.g. Protocols::PUSH_V0)

  • options (Options)

Yield Parameters:

  • engine (Engine)

    used by the caller to build a routing strategy with access to the engine’s connection map



87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/nnq/engine.rb', line 87

def initialize(protocol:, options:)
  @protocol        = protocol
  @options         = options
  @connections     = {}
  @listeners       = []
  @lifecycle       = SocketLifecycle.new
  @last_endpoint   = nil
  @new_pipe        = Async::Condition.new
  @monitor_queue   = nil
  @verbose_monitor = false
  @dialed          = Set.new
  @dial_opts       = {} # endpoint => kwargs for transport.connect on reconnect
  @routing         = yield(self)
end

Class Attribute Details

.transportsHash{String => Module} (readonly)

Returns registered transports.

Returns:

  • (Hash{String => Module})

    registered transports



31
32
33
# File 'lib/nnq/engine.rb', line 31

def transports
  @transports
end

Instance Attribute Details

#connectionsHash{NNQ::Connection => ConnectionLifecycle} (readonly)

Returns:



48
49
50
# File 'lib/nnq/engine.rb', line 48

def connections
  @connections
end

#dialedSet<String> (readonly)

Returns endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.

Returns:

  • (Set<String>)

    endpoints we have called #connect on; used to decide whether to schedule a reconnect after a connection is lost.



66
67
68
# File 'lib/nnq/engine.rb', line 66

def dialed
  @dialed
end

#last_endpointString? (readonly)

Returns:

  • (String, nil)


56
57
58
# File 'lib/nnq/engine.rb', line 56

def last_endpoint
  @last_endpoint
end

#lifecycleSocketLifecycle (readonly)

Returns:



52
53
54
# File 'lib/nnq/engine.rb', line 52

def lifecycle
  @lifecycle
end

#monitor_queueAsync::Queue?

Returns monitor event queue (set by Socket#monitor).

Returns:

  • (Async::Queue, nil)

    monitor event queue (set by Socket#monitor)



70
71
72
# File 'lib/nnq/engine.rb', line 70

def monitor_queue
  @monitor_queue
end

#monitor_taskAsync::Task?

Returns the monitor consumer task, if any.

Returns:

  • (Async::Task, nil)

    the monitor consumer task, if any



74
75
76
# File 'lib/nnq/engine.rb', line 74

def monitor_task
  @monitor_task
end

#new_pipeAsync::Condition (readonly)

Returns signaled when a new pipe is registered.

Returns:

  • (Async::Condition)

    signaled when a new pipe is registered



60
61
62
# File 'lib/nnq/engine.rb', line 60

def new_pipe
  @new_pipe
end

#optionsOptions (readonly)

Returns:



40
41
42
# File 'lib/nnq/engine.rb', line 40

def options
  @options
end

#protocolInteger (readonly)

Returns our SP protocol id (e.g. Protocols::PUSH_V0).

Returns:

  • (Integer)

    our SP protocol id (e.g. Protocols::PUSH_V0)



36
37
38
# File 'lib/nnq/engine.rb', line 36

def protocol
  @protocol
end

#routingRouting strategy (readonly)

Returns:



44
45
46
# File 'lib/nnq/engine.rb', line 44

def routing
  @routing
end

#verbose_monitorBoolean

Returns when true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue. Set by Socket#monitor via its verbose: kwarg.

Returns:

  • (Boolean)

    when true, #emit_verbose_monitor_event forwards per-message traces (:message_sent / :message_received) to the monitor queue. Set by Socket#monitor via its verbose: kwarg.



80
81
82
# File 'lib/nnq/engine.rb', line 80

def verbose_monitor
  @verbose_monitor
end

Instance Method Details

#all_peers_goneAsync::Promise

Returns resolves when all peers have disconnected (edge-triggered, after at least one peer connected).

Returns:

  • (Async::Promise)

    resolves when all peers have disconnected (edge-triggered, after at least one peer connected)



153
154
155
# File 'lib/nnq/engine.rb', line 153

def all_peers_gone
  @lifecycle.all_peers_gone
end

#barrierAsync::Barrier?

Returns:

  • (Async::Barrier, nil)


135
136
137
# File 'lib/nnq/engine.rb', line 135

def barrier
  @lifecycle.barrier
end

#bind(endpoint, **opts) ⇒ Object

Binds to endpoint. Synchronous: errors propagate.



195
196
197
198
199
200
201
202
203
204
# File 'lib/nnq/engine.rb', line 195

def bind(endpoint, **opts)
  transport = transport_for(endpoint)
  listener  = transport.bind(endpoint, self, **opts)
  listener.start_accept_loop(@lifecycle.barrier) do |io, framing = :tcp|
    handle_accepted(io, endpoint: endpoint, framing: framing)
  end
  @listeners << listener
  @last_endpoint = listener.endpoint
  emit_monitor_event(:listening, endpoint: @last_endpoint)
end

#capture_parent_task(task, on_io_thread:) ⇒ Object

Stores the parent Async task that long-lived NNQ fibers will attach to. The caller (Socket) is responsible for picking the right one (the user’s current task, or Reactor.root_task).



189
190
191
# File 'lib/nnq/engine.rb', line 189

def capture_parent_task(task, on_io_thread:)
  @lifecycle.capture_parent_task(task, on_io_thread: on_io_thread)
end

#closeObject

Closes the engine: stops listeners, drains the send queue subject to linger, stops routing pumps (which by now are parked on the empty queue), then tears down every connection’s lifecycle. Order matters — closing connections first would force mid-flush pumps to abort with IOError.



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/nnq/engine.rb', line 304

def close
  return unless @lifecycle.alive?

  @lifecycle.start_closing!
  @listeners.each(&:stop)
  drain_send_queue(@options.linger)
  @routing.close if @routing.respond_to?(:close)

  # Tear down each remaining connection via its lifecycle. The
  # collection mutates during iteration, so snapshot the values.
  @connections.values.each(&:close!)

  # Emit :closed, seal the monitor queue, and wait for the monitor
  # fiber to drain it before cancelling tasks. Without this join,
  # trailing :message_received events that the recv pump enqueued
  # just before close would be lost when the barrier.stop below
  # Async::Stops the monitor fiber mid-dequeue.
  emit_monitor_event(:closed)
  close_monitor_queue
  @monitor_task&.wait

  # Cascade-cancel every remaining task (reconnect loops, accept
  # loops, supervisors) in one shot.
  @lifecycle.barrier&.stop
  @lifecycle.finish_closing!
  @new_pipe.signal

  # Unblock anyone waiting on peer_connected when the socket is
  # closed before a peer ever arrived.
  @lifecycle.peer_connected.resolve(nil) unless @lifecycle.peer_connected.resolved?
end

#close_readObject

Closes only the recv side. Buffered messages drain, then Socket#receive returns nil. Send side remains operational.



181
182
183
# File 'lib/nnq/engine.rb', line 181

def close_read
  @routing.close_read if @routing.respond_to?(:close_read)
end

#closed?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/nnq/engine.rb', line 140

def closed?
  @lifecycle.closed?
end

#connect(endpoint, **opts) ⇒ Object

Connects to endpoint. Non-blocking for tcp:// and ipc:// — the actual dial happens inside a background reconnect task that retries with exponential back-off until the peer becomes reachable. Inproc connect is synchronous and instant.



211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/nnq/engine.rb', line 211

def connect(endpoint, **opts)
  @dialed << endpoint
  @dial_opts[endpoint] = opts unless opts.empty?
  @last_endpoint = endpoint

  if endpoint.start_with?("inproc://")
    transport_for(endpoint).connect(endpoint, self, **opts)
  else
    emit_monitor_event(:connect_delayed, endpoint: endpoint)
    Reconnect.schedule(endpoint, @options, @lifecycle.barrier, self, delay: 0)
  end
end

#connection_ready(conn, endpoint:) ⇒ Object

Registers an already-connected, framing-free pipe (inproc). Skips the SP handshake entirely — Transport::Inproc::Pipe is a Ruby duck-type for Connection and has no wire protocol.



279
280
281
282
283
284
285
286
# File 'lib/nnq/engine.rb', line 279

def connection_ready(conn, endpoint:)
  lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: :inproc)
  lifecycle.ready_direct!(conn)
  spawn_recv_loop(conn) if @routing.respond_to?(:enqueue) && @connections.key?(conn)
  lifecycle.start_supervisor!
rescue ConnectionRejected
  # routing rejected this peer (e.g. PAIR already bonded)
end

#dial_opts_for(endpoint) ⇒ Object

Transport options captured from #connect for endpoint. Used by Reconnect to re-dial with the original kwargs. Empty hash for endpoints connected without extra options.



228
229
230
# File 'lib/nnq/engine.rb', line 228

def dial_opts_for(endpoint)
  @dial_opts[endpoint] || {}
end

#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ Object

Emits a monitor event to the attached queue (if any).



104
105
106
107
108
# File 'lib/nnq/engine.rb', line 104

def emit_monitor_event(type, endpoint: nil, detail: nil)
  return unless @monitor_queue
  @monitor_queue.enqueue(MonitorEvent.new(type: type, endpoint: endpoint, detail: detail))
rescue Async::Stop
end

#emit_verbose_msg_received(body) ⇒ Object

Emits a :message_received verbose event. Same early-return discipline as #emit_verbose_msg_sent.



122
123
124
125
# File 'lib/nnq/engine.rb', line 122

def emit_verbose_msg_received(body)
  return unless @verbose_monitor
  emit_monitor_event(:message_received, detail: { body: body })
end

#emit_verbose_msg_sent(body) ⇒ Object

Emits a :message_sent verbose event. Early-returns before allocating the detail hash so the hot send path pays nothing when verbose monitoring is off.



114
115
116
117
# File 'lib/nnq/engine.rb', line 114

def emit_verbose_msg_sent(body)
  return unless @verbose_monitor
  emit_monitor_event(:message_sent, detail: { body: body })
end

#handle_accepted(io, endpoint:, framing: :tcp) ⇒ Object

Called by transports for each accepted client connection.



253
254
255
256
257
258
259
260
261
262
# File 'lib/nnq/engine.rb', line 253

def handle_accepted(io, endpoint:, framing: :tcp)
  lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing)
  lifecycle.handshake!(io)
  spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn)
  lifecycle.start_supervisor!
rescue ConnectionRejected
  # routing rejected this peer (e.g. PAIR already bonded) — lifecycle cleaned up
rescue => e
  warn("nnq: handshake failed for #{endpoint}: #{e.class}: #{e.message}") if $DEBUG
end

#handle_connected(io, endpoint:, framing: :tcp) ⇒ Object

Called by transports for each dialed connection.



266
267
268
269
270
271
272
273
# File 'lib/nnq/engine.rb', line 266

def handle_connected(io, endpoint:, framing: :tcp)
  lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, framing: framing)
  lifecycle.handshake!(io)
  spawn_recv_loop(lifecycle.conn) if @routing.respond_to?(:enqueue) && @connections.key?(lifecycle.conn)
  lifecycle.start_supervisor!
rescue ConnectionRejected
  # unusual on connect side, but handled identically
end

#handle_connection_lost(conn) ⇒ Object

Called by routing pumps (or the recv loop) when their connection has died. Idempotent via the lifecycle state guard.



339
340
341
# File 'lib/nnq/engine.rb', line 339

def handle_connection_lost(conn)
  @connections[conn]&.lost!
end

#maybe_reconnect(endpoint) ⇒ Object

Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still in the dialed set. Called from the connection lifecycle’s ‘lost!` path.



236
237
238
239
240
241
# File 'lib/nnq/engine.rb', line 236

def maybe_reconnect(endpoint)
  return unless endpoint && @dialed.include?(endpoint)
  return unless @lifecycle.alive? && @lifecycle.reconnect_enabled
  return if endpoint.start_with?("inproc://")
  Reconnect.schedule(endpoint, @options, @lifecycle.barrier, self)
end

#parent_taskAsync::Task?

Returns:

  • (Async::Task, nil)


129
130
131
# File 'lib/nnq/engine.rb', line 129

def parent_task
  @lifecycle.parent_task
end

#peer_connectedAsync::Promise

Returns resolves with the first connected peer.

Returns:

  • (Async::Promise)

    resolves with the first connected peer



146
147
148
# File 'lib/nnq/engine.rb', line 146

def peer_connected
  @lifecycle.peer_connected
end

#reconnect_enabledBoolean

Returns:

  • (Boolean)


166
167
168
# File 'lib/nnq/engine.rb', line 166

def reconnect_enabled
  @lifecycle.reconnect_enabled
end

#reconnect_enabled=(value) ⇒ Object

Disables or re-enables automatic reconnect. nnq has no reconnect loop yet, so this is forward-looking — TransientMonitor flips it before draining.



174
175
176
# File 'lib/nnq/engine.rb', line 174

def reconnect_enabled=(value)
  @lifecycle.reconnect_enabled = value
end

#resolve_all_peers_gone_if_emptyObject

Called by ConnectionLifecycle teardown. Resolves ‘all_peers_gone` if the connection set is now empty and we had peers.



160
161
162
# File 'lib/nnq/engine.rb', line 160

def resolve_all_peers_gone_if_empty
  @lifecycle.resolve_all_peers_gone_if_empty(@connections)
end

#spawn_task(annotation:, parent: @lifecycle.barrier, &block) ⇒ Object

Spawns a task under the given parent barrier (defaults to the socket-level barrier). Used by routing strategies (e.g. PUSH send pump) to attach long-lived fibers to the engine’s lifecycle. The parent barrier tracks every spawned task so teardown is a single barrier.stop call.



294
295
296
# File 'lib/nnq/engine.rb', line 294

def spawn_task(annotation:, parent: @lifecycle.barrier, &block)
  parent.async(annotation: annotation, &block)
end

#transport_for(endpoint) ⇒ Object

Public so Reconnect can dial directly without re-deriving the transport from the URL each iteration.



246
247
248
249
# File 'lib/nnq/engine.rb', line 246

def transport_for(endpoint)
  scheme = endpoint[/\A([a-z+]+):\/\//i, 1] or raise Error, "no scheme: #{endpoint}"
  Engine.transports[scheme] or raise Error, "unsupported transport: #{scheme}"
end