Class: Cosmo::Client
- Inherits:
-
Object
- Object
- Cosmo::Client
- Defined in:
- lib/cosmo/client.rb
Instance Attribute Summary collapse
-
#js ⇒ Object
readonly
Returns the value of attribute js.
-
#nc ⇒ Object
readonly
Returns the value of attribute nc.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #consumer_info(stream_name, consumer_name) ⇒ Object
- #create_stream(name, config) ⇒ Object
- #delete_message(name, seq) ⇒ Object
- #delete_stream(name, params = {}) ⇒ Object
- #get_message(name, **options) ⇒ Object
-
#initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222")) ⇒ Client
constructor
A new instance of Client.
- #kv(name, **options) ⇒ Object
- #list_consumers(stream_name) ⇒ Object
- #list_streams ⇒ Object
- #publish(subject, payload, **params) ⇒ Object
- #purge(stream_name, subject) ⇒ Object
- #stream_info(name) ⇒ Object
- #subscribe(subject, consumer_name, config) ⇒ Object
Constructor Details
#initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222")) ⇒ Client
Returns a new instance of Client.
14 15 16 17 18 19 |
# File 'lib/cosmo/client.rb', line 14 def initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222")) Logger.debug "Connecting to NATS server at #{nats_url}..." @nc = NATS.connect(nats_url) Logger.debug "Connection established" @js = @nc.jetstream end |
Instance Attribute Details
#js ⇒ Object (readonly)
Returns the value of attribute js.
12 13 14 |
# File 'lib/cosmo/client.rb', line 12 def js @js end |
#nc ⇒ Object (readonly)
Returns the value of attribute nc.
12 13 14 |
# File 'lib/cosmo/client.rb', line 12 def nc @nc end |
Class Method Details
Instance Method Details
#close ⇒ Object
83 84 85 |
# File 'lib/cosmo/client.rb', line 83 def close nc.close end |
#consumer_info(stream_name, consumer_name) ⇒ Object
55 56 57 |
# File 'lib/cosmo/client.rb', line 55 def consumer_info(stream_name, consumer_name) js.consumer_info(stream_name, consumer_name) end |
#create_stream(name, config) ⇒ Object
33 34 35 |
# File 'lib/cosmo/client.rb', line 33 def create_stream(name, config) js.add_stream(name: name, **config) end |
#delete_message(name, seq) ⇒ Object
63 64 65 66 |
# File 'lib/cosmo/client.rb', line 63 def (name, seq) response = nc.request("$JS.API.STREAM.MSG.DELETE.#{name}", JSON.dump({ seq: seq })) Utils::Json.parse(response.data, symbolize_names: false) end |
#delete_stream(name, params = {}) ⇒ Object
37 38 39 |
# File 'lib/cosmo/client.rb', line 37 def delete_stream(name, params = {}) js.delete_stream(name, params) end |
#get_message(name, **options) ⇒ Object
59 60 61 |
# File 'lib/cosmo/client.rb', line 59 def (name, **) js.get_msg(name, **) end |
#kv(name, **options) ⇒ Object
77 78 79 80 81 |
# File 'lib/cosmo/client.rb', line 77 def kv(name, **) js.key_value(name) rescue NATS::KeyValue::BucketNotFoundError js.create_key_value({ bucket: name }.merge()) end |
#list_consumers(stream_name) ⇒ Object
49 50 51 52 53 |
# File 'lib/cosmo/client.rb', line 49 def list_consumers(stream_name) response = nc.request("$JS.API.CONSUMER.LIST.#{stream_name}", "") data = Utils::Json.parse(response.data, symbolize_names: false) data["consumers"] end |
#list_streams ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/cosmo/client.rb', line 41 def list_streams response = nc.request("$JS.API.STREAM.LIST", "") data = Utils::Json.parse(response.data, symbolize_names: false) return [] if data.nil? || data["streams"].nil? data["streams"].filter_map { _1.dig("config", "name") } end |
#publish(subject, payload, **params) ⇒ Object
21 22 23 |
# File 'lib/cosmo/client.rb', line 21 def publish(subject, payload, **params) js.publish(subject, payload, **params) end |
#purge(stream_name, subject) ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/cosmo/client.rb', line 68 def purge(stream_name, subject) payload = subject ? Utils::Json.dump({ filter: subject }) : "" response = @nc.request("$JS.API.STREAM.PURGE.#{stream_name}", payload) result = Utils::Json.parse(response.data, default: {}, symbolize_names: false) raise NATS::JetStream::Error, result.dig("error", "description") if result["error"] result["purged"] # number of messages purged end |
#stream_info(name) ⇒ Object
29 30 31 |
# File 'lib/cosmo/client.rb', line 29 def stream_info(name) js.stream_info(name) end |
#subscribe(subject, consumer_name, config) ⇒ Object
25 26 27 |
# File 'lib/cosmo/client.rb', line 25 def subscribe(subject, consumer_name, config) js.pull_subscribe(subject, consumer_name, config: config) end |