Class: NNQ::Routing::RespondentRaw

Inherits:
Object
  • Object
show all
Includes:
Backtrace
Defined in:
lib/nnq/routing/respondent_raw.rb

Overview

Raw RESPONDENT: mirror of RepRaw for the survey pattern. No survey-window state, no pending slot — the app receives ‘[pipe, header, body]` tuples and chooses whether (and when) to reply via `send(body, to:, header:)`.

Constant Summary

Constants included from Backtrace

Backtrace::MAX_HOPS

Instance Method Summary collapse

Methods included from Backtrace

#parse_backtrace, too_many_hops?

Constructor Details

#initialize(engine) ⇒ RespondentRaw

Returns a new instance of RespondentRaw.



17
18
19
20
# File 'lib/nnq/routing/respondent_raw.rb', line 17

def initialize(engine)
  @engine     = engine
  @recv_queue = Async::LimitedQueue.new(engine.options.recv_hwm)
end

Instance Method Details

#closeObject



50
51
52
# File 'lib/nnq/routing/respondent_raw.rb', line 50

def close
  @recv_queue.enqueue(nil)
end

#close_readObject



55
56
57
# File 'lib/nnq/routing/respondent_raw.rb', line 55

def close_read
  @recv_queue.enqueue(nil)
end

#enqueue(wire_bytes, conn) ⇒ Object



43
44
45
46
47
# File 'lib/nnq/routing/respondent_raw.rb', line 43

def enqueue(wire_bytes, conn)
  header, payload = parse_backtrace(wire_bytes)
  return unless header
  @recv_queue.enqueue([conn, header, payload])
end

#preview_body(wire) ⇒ Object



37
38
39
40
# File 'lib/nnq/routing/respondent_raw.rb', line 37

def preview_body(wire)
  _, payload = parse_backtrace(wire)
  payload || wire
end

#receiveObject



23
24
25
# File 'lib/nnq/routing/respondent_raw.rb', line 23

def receive
  @recv_queue.dequeue
end

#send(body, to:, header:) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/nnq/routing/respondent_raw.rb', line 28

def send(body, to:, header:)
  return if to.closed?
  return if Backtrace.too_many_hops?(header)
  to.send_message(body, header: header)
  @engine.emit_verbose_msg_sent(body)
rescue ClosedError
end