Class: NatsAsync::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/nats_async/message.rb

Constant Summary collapse

ACK =
"+ACK"
NAK =
"-NAK"
TERM =
"+TERM"
WPI =
"+WPI"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subject:, sid:, reply:, data:, connector:, headers: {}, status: nil, description: nil) ⇒ Message

Returns a new instance of Message.



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/nats_async/message.rb', line 28

def initialize(subject:, sid:, reply:, data:, connector:, headers: {}, status: nil, description: nil)
  @subject = subject
  @sid = sid
  @reply = reply
  @data = data
  @headers = Headers.wrap(headers)
  @status = status
  @description = description
  @connector = connector
  @acked = false
end

Instance Attribute Details

#dataObject (readonly)

Returns the value of attribute data.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def data
  @data
end

#descriptionObject (readonly)

Returns the value of attribute description.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def description
  @description
end

#headersObject (readonly) Also known as: header

Returns the value of attribute headers.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def headers
  @headers
end

#replyObject (readonly)

Returns the value of attribute reply.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def reply
  @reply
end

#sidObject (readonly)

Returns the value of attribute sid.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def sid
  @sid
end

#statusObject (readonly)

Returns the value of attribute status.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def status
  @status
end

#subjectObject (readonly)

Returns the value of attribute subject.



25
26
27
# File 'lib/nats_async/message.rb', line 25

def subject
  @subject
end

Instance Method Details

#ack(**_params) ⇒ Object



40
# File 'lib/nats_async/message.rb', line 40

def ack(**_params) = finalize_ack!(ACK)

#ack_sync(timeout: 0.5, **_params) ⇒ Object



42
# File 'lib/nats_async/message.rb', line 42

def ack_sync(timeout: 0.5, **_params) = finalize_ack!(ACK, timeout: timeout)

#ackable?Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/nats_async/message.rb', line 56

def ackable?
  !reply.to_s.empty?
end

#acked?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/nats_async/message.rb', line 60

def acked?
  @acked
end

#in_progress(timeout: nil, **_params) ⇒ Object



51
52
53
54
# File 'lib/nats_async/message.rb', line 51

def in_progress(timeout: nil, **_params)
  ensure_reply!
  publish_ack(WPI, timeout: timeout)
end

#metadataObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/nats_async/message.rb', line 64

def 
  return unless reply&.start_with?("$JS.ACK.")

  tokens = reply.split(".")
  return if tokens.size < 9

  if tokens.size >= 12
    domain = tokens[2] == "_" ? "" : tokens[2]
    stream = tokens[4]
    consumer = tokens[5]
    delivered = tokens[6].to_i
    stream_seq = tokens[7].to_i
    consumer_seq = tokens[8].to_i
    timestamp_ns = tokens[9].to_i
    pending = tokens[10].to_i
  else
    domain = ""
    stream = tokens[2]
    consumer = tokens[3]
    delivered = tokens[4].to_i
    stream_seq = tokens[5].to_i
    consumer_seq = tokens[6].to_i
    timestamp_ns = tokens[7].to_i
    pending = tokens[8].to_i
  end

  {
    stream: stream,
    consumer: consumer,
    delivered: delivered,
    sequence: {stream: stream_seq, consumer: consumer_seq},
    timestamp_ns: timestamp_ns,
    pending: pending,
    domain: domain
  }
end

#nak(delay: nil, timeout: nil, **_params) ⇒ Object



44
45
46
47
# File 'lib/nats_async/message.rb', line 44

def nak(delay: nil, timeout: nil, **_params)
  payload = delay ? "#{NAK} #{{delay: delay}.to_json}" : NAK
  finalize_ack!(payload, timeout: timeout)
end

#term(timeout: nil, **_params) ⇒ Object



49
# File 'lib/nats_async/message.rb', line 49

def term(timeout: nil, **_params) = finalize_ack!(TERM, timeout: timeout)