Class: NatsAsync::JetStream

Inherits:
Object
  • Object
show all
Defined in:
lib/nats_async/jetstream.rb

Defined Under Namespace

Classes: ConsumerError, Error, NotFound, PublishAck, PullSubscription

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ JetStream

Returns a new instance of JetStream.



100
101
102
# File 'lib/nats_async/jetstream.rb', line 100

def initialize(client)
  @client = client
end

Instance Method Details

#account_infoObject



119
120
121
# File 'lib/nats_async/jetstream.rb', line 119

def 
  api_request("INFO")
end

#add_consumer(stream, config = nil, **options) ⇒ Object



161
162
163
164
165
166
167
168
# File 'lib/nats_async/jetstream.rb', line 161

def add_consumer(stream, config = nil, **options)
  config = merge_config(config, options)
  consumer = consumer_name(config)
  subject = consumer ? @client.js_api_subject("CONSUMER.CREATE", stream, consumer) : @client.js_api_subject("CONSUMER.CREATE", stream)
  api_request_subject(subject, {stream_name: stream, config: config})
rescue Error => e
  raise ConsumerError.new(e.message, code: e.code, err_code: e.err_code, description: e.description)
end

#add_consumer?(stream, config = nil, **options) ⇒ Boolean

Returns:

  • (Boolean)


170
171
172
173
174
175
176
177
# File 'lib/nats_async/jetstream.rb', line 170

def add_consumer?(stream, config = nil, **options)
  config = merge_config(config, options)
  consumer = consumer_name(config)
  return false if consumer && consumer_exists?(stream, consumer)

  add_consumer(stream, config)
  true
end

#add_stream(name, config = nil, **options) ⇒ Object



130
131
132
133
# File 'lib/nats_async/jetstream.rb', line 130

def add_stream(name, config = nil, **options)
  config = merge_config(config, options).merge(name: name)
  api_request("STREAM.CREATE", name, config)
end

#add_stream?(name, config = nil, **options) ⇒ Boolean

Returns:

  • (Boolean)


135
136
137
138
139
140
# File 'lib/nats_async/jetstream.rb', line 135

def add_stream?(name, config = nil, **options)
  return false if stream_exists?(name)

  add_stream(name, config, **options)
  true
end

#available?Boolean

Returns:

  • (Boolean)


112
113
114
115
116
117
# File 'lib/nats_async/jetstream.rb', line 112

def available?
  
  true
rescue Error
  false
end

#consumer_exists?(stream, consumer) ⇒ Boolean

Returns:

  • (Boolean)


154
155
156
157
158
159
# File 'lib/nats_async/jetstream.rb', line 154

def consumer_exists?(stream, consumer)
  consumer_info(stream, consumer)
  true
rescue NotFound
  false
end

#consumer_info(stream, consumer) ⇒ Object



146
147
148
149
150
151
152
# File 'lib/nats_async/jetstream.rb', line 146

def consumer_info(stream, consumer)
  api_request("CONSUMER.INFO", stream, consumer)
rescue Error => e
  raise not_found_error(e) if not_found?(e)

  raise
end

#delete_consumer(stream, consumer) ⇒ Object



179
180
181
# File 'lib/nats_async/jetstream.rb', line 179

def delete_consumer(stream, consumer)
  api_request("CONSUMER.DELETE", stream, consumer, {})
end

#delete_stream(name) ⇒ Object



142
143
144
# File 'lib/nats_async/jetstream.rb', line 142

def delete_stream(name)
  api_request("STREAM.DELETE", name, {})
end

#publish(subject, payload = "", headers: nil, timeout: 2) ⇒ Object



183
184
185
186
187
188
189
190
191
# File 'lib/nats_async/jetstream.rb', line 183

def publish(subject, payload = "", headers: nil, timeout: 2)
  message = @client.request(subject, payload, timeout: timeout, headers: headers).wait
  result = JSON.parse(message.data, symbolize_names: true)
  raise_api_error(subject, result[:error]) if result[:error]

  PublishAck.new(stream: result[:stream], seq: result[:seq], duplicate: result[:duplicate])
rescue JSON::ParserError => e
  raise Error, "JetStream publish returned invalid JSON for #{subject}: #{e.message}"
end

#pull_subscribe(subject, stream:, durable: nil, consumer: nil, config: {}, create: true) ⇒ Object

Raises:

  • (ArgumentError)


193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/nats_async/jetstream.rb', line 193

def pull_subscribe(subject, stream:, durable: nil, consumer: nil, config: {}, create: true)
  config = merge_config(config, {})
  config[:filter_subject] ||= subject
  config[:ack_policy] ||= "explicit"
  config[:durable_name] ||= durable if durable
  config[:name] ||= consumer if consumer

  consumer_name = consumer_name(config)
  raise ArgumentError, "durable, consumer, config[:name], or config[:durable_name] is required" if consumer_name.to_s.empty?

  add_consumer?(stream, config) if create
  PullSubscription.new(client: @client, stream: stream, consumer: consumer_name)
end

#stream_exists?(name) ⇒ Boolean

Returns:

  • (Boolean)


123
124
125
126
127
128
# File 'lib/nats_async/jetstream.rb', line 123

def stream_exists?(name)
  stream_info(name)
  true
rescue NotFound
  false
end

#stream_info(name) ⇒ Object



104
105
106
107
108
109
110
# File 'lib/nats_async/jetstream.rb', line 104

def stream_info(name)
  api_request("STREAM.INFO", name)
rescue Error => e
  raise not_found_error(e) if not_found?(e)

  raise
end