Class: NNQ::Routing::RepRaw
- Inherits:
-
Object
- Object
- NNQ::Routing::RepRaw
- Includes:
- Backtrace
- Defined in:
- lib/nnq/routing/rep_raw.rb
Overview
Raw REP: bypasses the cooked state machine. The incoming backtrace header is split off once at parse time and handed to the caller alongside the live Connection as ‘[pipe, header, body]`. Replies go back via `send(body, to:, header:)` which writes the caller-supplied header verbatim — no cooked pending/echo logic, no single-in-flight constraint.
Constant Summary
Constants included from Backtrace
Instance Method Summary collapse
- #close ⇒ Object
- #close_read ⇒ Object
-
#enqueue(wire_bytes, conn) ⇒ Object
Called by the engine recv loop.
-
#initialize(engine) ⇒ RepRaw
constructor
A new instance of RepRaw.
- #preview_body(wire) ⇒ Object
-
#receive ⇒ Array?
- pipe, header, body
-
or nil on close.
-
#send(body, to:, header:) ⇒ Object
Sends
bodywith the caller-suppliedheaderback toto(a Connection handed out by a prior #receive).
Methods included from Backtrace
#parse_backtrace, too_many_hops?
Constructor Details
#initialize(engine) ⇒ RepRaw
Returns a new instance of RepRaw.
19 20 21 22 |
# File 'lib/nnq/routing/rep_raw.rb', line 19 def initialize(engine) @engine = engine @recv_queue = Async::LimitedQueue.new(engine..recv_hwm) end |
Instance Method Details
#close ⇒ Object
59 60 61 |
# File 'lib/nnq/routing/rep_raw.rb', line 59 def close @recv_queue.enqueue(nil) end |
#close_read ⇒ Object
64 65 66 |
# File 'lib/nnq/routing/rep_raw.rb', line 64 def close_read @recv_queue.enqueue(nil) end |
#enqueue(wire_bytes, conn) ⇒ Object
Called by the engine recv loop.
52 53 54 55 56 |
# File 'lib/nnq/routing/rep_raw.rb', line 52 def enqueue(wire_bytes, conn) header, payload = parse_backtrace(wire_bytes) return unless header # malformed / over-TTL — drop @recv_queue.enqueue([conn, header, payload]) end |
#preview_body(wire) ⇒ Object
45 46 47 48 |
# File 'lib/nnq/routing/rep_raw.rb', line 45 def preview_body(wire) _, payload = parse_backtrace(wire) payload || wire end |
#receive ⇒ Array?
Returns [pipe, header, body] or nil on close.
26 27 28 |
# File 'lib/nnq/routing/rep_raw.rb', line 26 def receive @recv_queue.dequeue end |
#send(body, to:, header:) ⇒ Object
Sends body with the caller-supplied header back to to (a Connection handed out by a prior #receive). Silent drop if the target is closed or the header would push total hops over MAX_HOPS.
35 36 37 38 39 40 41 42 |
# File 'lib/nnq/routing/rep_raw.rb', line 35 def send(body, to:, header:) return if to.closed? return if Backtrace.too_many_hops?(header) to.(body, header: header) @engine.emit_verbose_msg_sent(body) rescue ClosedError # peer went away between receive and send — drop end |