Class: NatsAsync::JetStream
- Inherits:
-
Object
- Object
- NatsAsync::JetStream
- Defined in:
- lib/nats_async/jetstream.rb
Defined Under Namespace
Classes: ConsumerError, Error, NotFound, PublishAck, PullSubscription
Instance Method Summary collapse
- #account_info ⇒ Object
- #add_consumer(stream, config = nil, **options) ⇒ Object
- #add_consumer?(stream, config = nil, **options) ⇒ Boolean
- #add_stream(name, config = nil, **options) ⇒ Object
- #add_stream?(name, config = nil, **options) ⇒ Boolean
- #available? ⇒ Boolean
- #consumer_exists?(stream, consumer) ⇒ Boolean
- #consumer_info(stream, consumer) ⇒ Object
- #delete_consumer(stream, consumer) ⇒ Object
- #delete_stream(name) ⇒ Object
-
#initialize(client) ⇒ JetStream
constructor
A new instance of JetStream.
- #publish(subject, payload = "", headers: nil, timeout: 2) ⇒ Object
- #pull_subscribe(subject, stream:, durable: nil, consumer: nil, config: {}, create: true) ⇒ Object
- #stream_exists?(name) ⇒ Boolean
- #stream_info(name) ⇒ Object
Constructor Details
#initialize(client) ⇒ JetStream
Returns a new instance of JetStream.
100 101 102 |
# File 'lib/nats_async/jetstream.rb', line 100 def initialize(client) @client = client end |
Instance Method Details
#account_info ⇒ Object
119 120 121 |
# File 'lib/nats_async/jetstream.rb', line 119 def account_info api_request("INFO") end |
#add_consumer(stream, config = nil, **options) ⇒ Object
161 162 163 164 165 166 167 168 |
# File 'lib/nats_async/jetstream.rb', line 161 def add_consumer(stream, config = nil, **) config = merge_config(config, ) consumer = consumer_name(config) subject = consumer ? @client.js_api_subject("CONSUMER.CREATE", stream, consumer) : @client.js_api_subject("CONSUMER.CREATE", stream) api_request_subject(subject, {stream_name: stream, config: config}) rescue Error => e raise ConsumerError.new(e., code: e.code, err_code: e.err_code, description: e.description) end |
#add_consumer?(stream, config = nil, **options) ⇒ Boolean
170 171 172 173 174 175 176 177 |
# File 'lib/nats_async/jetstream.rb', line 170 def add_consumer?(stream, config = nil, **) config = merge_config(config, ) consumer = consumer_name(config) return false if consumer && consumer_exists?(stream, consumer) add_consumer(stream, config) true end |
#add_stream(name, config = nil, **options) ⇒ Object
130 131 132 133 |
# File 'lib/nats_async/jetstream.rb', line 130 def add_stream(name, config = nil, **) config = merge_config(config, ).merge(name: name) api_request("STREAM.CREATE", name, config) end |
#add_stream?(name, config = nil, **options) ⇒ Boolean
135 136 137 138 139 140 |
# File 'lib/nats_async/jetstream.rb', line 135 def add_stream?(name, config = nil, **) return false if stream_exists?(name) add_stream(name, config, **) true end |
#available? ⇒ Boolean
112 113 114 115 116 117 |
# File 'lib/nats_async/jetstream.rb', line 112 def available? account_info true rescue Error false end |
#consumer_exists?(stream, consumer) ⇒ Boolean
154 155 156 157 158 159 |
# File 'lib/nats_async/jetstream.rb', line 154 def consumer_exists?(stream, consumer) consumer_info(stream, consumer) true rescue NotFound false end |
#consumer_info(stream, consumer) ⇒ Object
146 147 148 149 150 151 152 |
# File 'lib/nats_async/jetstream.rb', line 146 def consumer_info(stream, consumer) api_request("CONSUMER.INFO", stream, consumer) rescue Error => e raise not_found_error(e) if not_found?(e) raise end |
#delete_consumer(stream, consumer) ⇒ Object
179 180 181 |
# File 'lib/nats_async/jetstream.rb', line 179 def delete_consumer(stream, consumer) api_request("CONSUMER.DELETE", stream, consumer, {}) end |
#delete_stream(name) ⇒ Object
142 143 144 |
# File 'lib/nats_async/jetstream.rb', line 142 def delete_stream(name) api_request("STREAM.DELETE", name, {}) end |
#publish(subject, payload = "", headers: nil, timeout: 2) ⇒ Object
183 184 185 186 187 188 189 190 191 |
# File 'lib/nats_async/jetstream.rb', line 183 def publish(subject, payload = "", headers: nil, timeout: 2) = @client.request(subject, payload, timeout: timeout, headers: headers).wait result = JSON.parse(.data, symbolize_names: true) raise_api_error(subject, result[:error]) if result[:error] PublishAck.new(stream: result[:stream], seq: result[:seq], duplicate: result[:duplicate]) rescue JSON::ParserError => e raise Error, "JetStream publish returned invalid JSON for #{subject}: #{e.}" end |
#pull_subscribe(subject, stream:, durable: nil, consumer: nil, config: {}, create: true) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/nats_async/jetstream.rb', line 193 def pull_subscribe(subject, stream:, durable: nil, consumer: nil, config: {}, create: true) config = merge_config(config, {}) config[:filter_subject] ||= subject config[:ack_policy] ||= "explicit" config[:durable_name] ||= durable if durable config[:name] ||= consumer if consumer consumer_name = consumer_name(config) raise ArgumentError, "durable, consumer, config[:name], or config[:durable_name] is required" if consumer_name.to_s.empty? add_consumer?(stream, config) if create PullSubscription.new(client: @client, stream: stream, consumer: consumer_name) end |
#stream_exists?(name) ⇒ Boolean
123 124 125 126 127 128 |
# File 'lib/nats_async/jetstream.rb', line 123 def stream_exists?(name) stream_info(name) true rescue NotFound false end |
#stream_info(name) ⇒ Object
104 105 106 107 108 109 110 |
# File 'lib/nats_async/jetstream.rb', line 104 def stream_info(name) api_request("STREAM.INFO", name) rescue Error => e raise not_found_error(e) if not_found?(e) raise end |