Class: NatsAsync::JetStream
- Inherits:
-
Object
- Object
- NatsAsync::JetStream
show all
- Defined in:
- lib/nats_async/jetstream.rb
Defined Under Namespace
Classes: ConsumerError, Error, NotFound, PublishAck, PullSubscription
Instance Method Summary
collapse
-
#account_info ⇒ Object
-
#add_consumer(stream, config = nil, **options) ⇒ Object
-
#add_consumer?(stream, config = nil, **options) ⇒ Boolean
-
#add_stream(name, config = nil, **options) ⇒ Object
-
#add_stream?(name, config = nil, **options) ⇒ Boolean
-
#available? ⇒ Boolean
-
#consumer_exists?(stream, consumer) ⇒ Boolean
-
#consumer_info(stream, consumer) ⇒ Object
-
#delete_consumer(stream, consumer) ⇒ Object
-
#delete_stream(name) ⇒ Object
-
#initialize(client) ⇒ JetStream
constructor
A new instance of JetStream.
-
#publish(subject, payload = "", headers: nil, timeout: 2) ⇒ Object
-
#pull_subscribe(subject, stream:, durable: nil, consumer: nil, config: {}, create: true) ⇒ Object
-
#stream_exists?(name) ⇒ Boolean
-
#stream_info(name) ⇒ Object
Constructor Details
#initialize(client) ⇒ JetStream
Returns a new instance of JetStream.
100
101
102
|
# File 'lib/nats_async/jetstream.rb', line 100
def initialize(client)
@client = client
end
|
Instance Method Details
#account_info ⇒ Object
119
120
121
|
# File 'lib/nats_async/jetstream.rb', line 119
def account_info
api_request("INFO")
end
|
#add_consumer(stream, config = nil, **options) ⇒ Object
161
162
163
164
165
166
167
168
169
170
|
# File 'lib/nats_async/jetstream.rb', line 161
def add_consumer(stream, config = nil, **options)
config = merge_config(config, options)
config[:ack_wait] = seconds_to_nanoseconds(config[:ack_wait]) if config[:ack_wait]
config[:inactive_threshold] = seconds_to_nanoseconds(config[:inactive_threshold]) if config[:inactive_threshold]
consumer = consumer_name(config)
subject = consumer ? @client.js_api_subject("CONSUMER.CREATE", stream, consumer) : @client.js_api_subject("CONSUMER.CREATE", stream)
api_request_subject(subject, {stream_name: stream, config: config})
rescue Error => e
raise ConsumerError.new(e.message, code: e.code, err_code: e.err_code, description: e.description)
end
|
#add_consumer?(stream, config = nil, **options) ⇒ Boolean
172
173
174
175
176
177
178
179
|
# File 'lib/nats_async/jetstream.rb', line 172
def add_consumer?(stream, config = nil, **options)
config = merge_config(config, options)
consumer = consumer_name(config)
return false if consumer && consumer_exists?(stream, consumer)
add_consumer(stream, config)
true
end
|
#add_stream(name, config = nil, **options) ⇒ Object
130
131
132
133
|
# File 'lib/nats_async/jetstream.rb', line 130
def add_stream(name, config = nil, **options)
config = merge_config(config, options).merge(name: name)
api_request("STREAM.CREATE", name, config)
end
|
#add_stream?(name, config = nil, **options) ⇒ Boolean
135
136
137
138
139
140
|
# File 'lib/nats_async/jetstream.rb', line 135
def add_stream?(name, config = nil, **options)
return false if stream_exists?(name)
add_stream(name, config, **options)
true
end
|
#available? ⇒ Boolean
112
113
114
115
116
117
|
# File 'lib/nats_async/jetstream.rb', line 112
def available?
account_info
true
rescue Error
false
end
|
#consumer_exists?(stream, consumer) ⇒ Boolean
154
155
156
157
158
159
|
# File 'lib/nats_async/jetstream.rb', line 154
def consumer_exists?(stream, consumer)
consumer_info(stream, consumer)
true
rescue NotFound
false
end
|
#consumer_info(stream, consumer) ⇒ Object
146
147
148
149
150
151
152
|
# File 'lib/nats_async/jetstream.rb', line 146
def consumer_info(stream, consumer)
api_request("CONSUMER.INFO", stream, consumer)
rescue Error => e
raise not_found_error(e) if not_found?(e)
raise
end
|
#delete_consumer(stream, consumer) ⇒ Object
181
182
183
|
# File 'lib/nats_async/jetstream.rb', line 181
def delete_consumer(stream, consumer)
api_request("CONSUMER.DELETE", stream, consumer, {})
end
|
#delete_stream(name) ⇒ Object
142
143
144
|
# File 'lib/nats_async/jetstream.rb', line 142
def delete_stream(name)
api_request("STREAM.DELETE", name, {})
end
|
#publish(subject, payload = "", headers: nil, timeout: 2) ⇒ Object
185
186
187
188
189
190
191
192
193
|
# File 'lib/nats_async/jetstream.rb', line 185
def publish(subject, payload = "", headers: nil, timeout: 2)
message = @client.request(subject, payload, timeout: timeout, headers: ).wait
result = JSON.parse(message.data, symbolize_names: true)
raise_api_error(subject, result[:error]) if result[:error]
PublishAck.new(stream: result[:stream], seq: result[:seq], duplicate: result[:duplicate])
rescue JSON::ParserError => e
raise Error, "JetStream publish returned invalid JSON for #{subject}: #{e.message}"
end
|
#pull_subscribe(subject, stream:, durable: nil, consumer: nil, config: {}, create: true) ⇒ Object
195
196
197
198
199
200
201
202
203
204
205
206
207
|
# File 'lib/nats_async/jetstream.rb', line 195
def pull_subscribe(subject, stream:, durable: nil, consumer: nil, config: {}, create: true)
config = merge_config(config, {})
config[:filter_subject] ||= subject
config[:ack_policy] ||= "explicit"
config[:durable_name] ||= durable if durable
config[:name] ||= consumer if consumer
consumer_name = consumer_name(config)
raise ArgumentError, "durable, consumer, config[:name], or config[:durable_name] is required" if consumer_name.to_s.empty?
add_consumer?(stream, config) if create
PullSubscription.new(client: @client, stream: stream, consumer: consumer_name)
end
|
#stream_exists?(name) ⇒ Boolean
123
124
125
126
127
128
|
# File 'lib/nats_async/jetstream.rb', line 123
def stream_exists?(name)
stream_info(name)
true
rescue NotFound
false
end
|
#stream_info(name) ⇒ Object
104
105
106
107
108
109
110
|
# File 'lib/nats_async/jetstream.rb', line 104
def stream_info(name)
api_request("STREAM.INFO", name)
rescue Error => e
raise not_found_error(e) if not_found?(e)
raise
end
|