Class: NatsAsync::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/nats_async/message.rb

Constant Summary collapse

ACK =
"+ACK"
NAK =
"-NAK"
TERM =
"+TERM"
WPI =
"+WPI"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subject:, sid:, reply:, data:, connector:, headers: {}) ⇒ Message

Returns a new instance of Message.



28
29
30
31
32
33
34
35
36
# File 'lib/nats_async/message.rb', line 28

def initialize(subject:, sid:, reply:, data:, connector:, headers: {})
  @subject = subject
  @sid = sid
  @reply = reply
  @data = data
  @headers = Headers.wrap(headers)
  @connector = connector
  @acked = false
end

Instance Attribute Details

#dataObject (readonly)

Returns the value of attribute data.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def data
  @data
end

#headersObject (readonly) Also known as: header

Returns the value of attribute headers.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def headers
  @headers
end

#replyObject (readonly)

Returns the value of attribute reply.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def reply
  @reply
end

#sidObject (readonly)

Returns the value of attribute sid.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def sid
  @sid
end

#subjectObject (readonly)

Returns the value of attribute subject.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def subject
  @subject
end

Instance Method Details

#ack(**_params) ⇒ Object



38
# File 'lib/nats_async/message.rb', line 38

def ack(**_params) = finalize_ack!(ACK)

#ack_sync(timeout: 0.5, **_params) ⇒ Object



40
# File 'lib/nats_async/message.rb', line 40

def ack_sync(timeout: 0.5, **_params) = finalize_ack!(ACK, timeout: timeout)

#ackable?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/nats_async/message.rb', line 54

def ackable?
  !reply.to_s.empty?
end

#acked?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/nats_async/message.rb', line 58

def acked?
  @acked
end

#in_progress(timeout: nil, **_params) ⇒ Object



49
50
51
52
# File 'lib/nats_async/message.rb', line 49

def in_progress(timeout: nil, **_params)
  ensure_reply!
  publish_ack(WPI, timeout: timeout)
end

#metadataObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/nats_async/message.rb', line 62

def 
  return unless reply&.start_with?("$JS.ACK.")

  tokens = reply.split(".")
  return if tokens.size < 9

  if tokens.size >= 12
    domain = tokens[2] == "_" ? "" : tokens[2]
    stream = tokens[4]
    consumer = tokens[5]
    delivered = tokens[6].to_i
    stream_seq = tokens[7].to_i
    consumer_seq = tokens[8].to_i
    timestamp_ns = tokens[9].to_i
    pending = tokens[10].to_i
  else
    domain = ""
    stream = tokens[2]
    consumer = tokens[3]
    delivered = tokens[4].to_i
    stream_seq = tokens[5].to_i
    consumer_seq = tokens[6].to_i
    timestamp_ns = tokens[7].to_i
    pending = tokens[8].to_i
  end

  {
    stream: stream,
    consumer: consumer,
    delivered: delivered,
    sequence: {stream: stream_seq, consumer: consumer_seq},
    timestamp_ns: timestamp_ns,
    pending: pending,
    domain: domain
  }
end

#nak(delay: nil, timeout: nil, **_params) ⇒ Object



42
43
44
45
# File 'lib/nats_async/message.rb', line 42

def nak(delay: nil, timeout: nil, **_params)
  payload = delay ? "#{NAK} #{{delay: delay}.to_json}" : NAK
  finalize_ack!(payload, timeout: timeout)
end

#term(timeout: nil, **_params) ⇒ Object



47
# File 'lib/nats_async/message.rb', line 47

def term(timeout: nil, **_params) = finalize_ack!(TERM, timeout: timeout)