Class: NNQ::Routing::ReqRaw

Inherits:
Object
  • Object
show all
Includes:
Backtrace
Defined in:
lib/nnq/routing/req_raw.rb

Overview

Raw REQ: bypasses the cooked single-in-flight request-id state machine. Sends are fire-and-forget round-robin with a caller-supplied header (typically ā€˜[id | 0x80000000].pack(ā€œNā€)`); replies land in a bounded queue and are delivered as `[pipe, header, body]` tuples so the app can correlate by header verbatim without ever parsing or slicing bytes.

Constant Summary

Constants included from Backtrace

Backtrace::MAX_HOPS

Instance Method Summary collapse

Methods included from Backtrace

#parse_backtrace, too_many_hops?

Constructor Details

#initialize(engine) ⇒ ReqRaw

Returns a new instance of ReqRaw.



19
20
21
22
23
# File 'lib/nnq/routing/req_raw.rb', line 19

def initialize(engine)
  @engine     = engine
  @next_idx   = 0
  @recv_queue = Async::LimitedQueue.new(engine.options.recv_hwm)
end

Instance Method Details

#closeObject



51
52
53
# File 'lib/nnq/routing/req_raw.rb', line 51

def close
  @recv_queue.enqueue(nil)
end

#close_readObject



56
57
58
# File 'lib/nnq/routing/req_raw.rb', line 56

def close_read
  @recv_queue.enqueue(nil)
end

#enqueue(wire_bytes, conn) ⇒ Object



44
45
46
47
48
# File 'lib/nnq/routing/req_raw.rb', line 44

def enqueue(wire_bytes, conn)
  header, payload = parse_backtrace(wire_bytes)
  return unless header
  @recv_queue.enqueue([conn, header, payload])
end

#preview_body(wire) ⇒ Object



33
34
35
36
# File 'lib/nnq/routing/req_raw.rb', line 33

def preview_body(wire)
  _, payload = parse_backtrace(wire)
  payload || wire
end

#receiveObject



39
40
41
# File 'lib/nnq/routing/req_raw.rb', line 39

def receive
  @recv_queue.dequeue
end

#send(body, header:) ⇒ Object



26
27
28
29
30
# File 'lib/nnq/routing/req_raw.rb', line 26

def send(body, header:)
  conn = pick_peer
  conn.send_message(body, header: header)
  @engine.emit_verbose_msg_sent(body)
end