Class: Cosmo::Client
- Inherits:
-
Object
- Object
- Cosmo::Client
- Defined in:
- lib/cosmo/client.rb
Instance Attribute Summary collapse
-
#js ⇒ Object
readonly
Returns the value of attribute js.
-
#nc ⇒ Object
readonly
Returns the value of attribute nc.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #consumer_info(stream_name, consumer_name) ⇒ Object
- #create_stream(name, config) ⇒ Object
-
#cron_subjects_in_stream(stream_name, filter) ⇒ Array<String>
Return all subjects in
stream_namethat matchfilterusing NATS’s subjects_filter on STREAM.INFO (requires NATS ≥ 2.9). - #delete_message(name, seq) ⇒ Object
- #delete_stream(name, params = {}) ⇒ Object
- #get_message(name, **options) ⇒ Object
-
#initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222")) ⇒ Client
constructor
A new instance of Client.
- #kv(name, allow_msg_ttl: false, **options) ⇒ Object
- #list_consumers(stream_name) ⇒ Object
- #list_streams ⇒ Object
- #pause_stream(name) ⇒ Object
- #publish(subject, payload, **params) ⇒ Object
- #purge(stream_name, subject) ⇒ Object
-
#setup_stream(name, config) ⇒ Object
Create/update a stream, falling back to create when there’s no stream.
- #stream_info(name) ⇒ Object
- #stream_paused?(name) ⇒ Boolean
- #subscribe(subject, consumer_name, config) ⇒ Object
- #unpause_stream(name) ⇒ Object
- #update_stream(name, config) ⇒ Object
Constructor Details
#initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222")) ⇒ Client
Returns a new instance of Client.
14 15 16 17 18 19 |
# File 'lib/cosmo/client.rb', line 14 def initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222")) Logger.debug "Connecting to NATS server at #{nats_url}..." @nc = NATS.connect(nats_url) Logger.debug "Connection established" @js = @nc.jetstream end |
Instance Attribute Details
#js ⇒ Object (readonly)
Returns the value of attribute js.
12 13 14 |
# File 'lib/cosmo/client.rb', line 12 def js @js end |
#nc ⇒ Object (readonly)
Returns the value of attribute nc.
12 13 14 |
# File 'lib/cosmo/client.rb', line 12 def nc @nc end |
Class Method Details
Instance Method Details
#close ⇒ Object
128 129 130 |
# File 'lib/cosmo/client.rb', line 128 def close nc.close end |
#consumer_info(stream_name, consumer_name) ⇒ Object
100 101 102 |
# File 'lib/cosmo/client.rb', line 100 def consumer_info(stream_name, consumer_name) js.consumer_info(stream_name, consumer_name) end |
#create_stream(name, config) ⇒ Object
33 34 35 |
# File 'lib/cosmo/client.rb', line 33 def create_stream(name, config) js.add_stream(name: name, **config) end |
#cron_subjects_in_stream(stream_name, filter) ⇒ Array<String>
Return all subjects in stream_name that match filter using NATS’s subjects_filter on STREAM.INFO (requires NATS ≥ 2.9).
57 58 59 60 61 62 63 64 |
# File 'lib/cosmo/client.rb', line 57 def cron_subjects_in_stream(stream_name, filter) payload = Utils::Json.dump({ subjects_filter: filter }) resp = nc.request("$JS.API.STREAM.INFO.#{stream_name}", payload) data = Utils::Json.parse(resp.data, symbolize_names: false) (data&.dig("state", "subjects") || {}).keys rescue StandardError [] end |
#delete_message(name, seq) ⇒ Object
108 109 110 111 |
# File 'lib/cosmo/client.rb', line 108 def (name, seq) response = nc.request("$JS.API.STREAM.MSG.DELETE.#{name}", JSON.dump({ seq: seq })) Utils::Json.parse(response.data, symbolize_names: false) end |
#delete_stream(name, params = {}) ⇒ Object
37 38 39 |
# File 'lib/cosmo/client.rb', line 37 def delete_stream(name, params = {}) js.delete_stream(name, params) end |
#get_message(name, **options) ⇒ Object
104 105 106 |
# File 'lib/cosmo/client.rb', line 104 def (name, **) js.get_msg(name, **) end |
#kv(name, allow_msg_ttl: false, **options) ⇒ Object
122 123 124 125 126 |
# File 'lib/cosmo/client.rb', line 122 def kv(name, allow_msg_ttl: false, **) js.key_value(name) rescue NATS::KeyValue::BucketNotFoundError allow_msg_ttl ? create_kv_with_msg_ttl(name, **) : js.create_key_value({ bucket: name }.merge()) end |
#list_consumers(stream_name) ⇒ Object
94 95 96 97 98 |
# File 'lib/cosmo/client.rb', line 94 def list_consumers(stream_name) response = nc.request("$JS.API.CONSUMER.LIST.#{stream_name}", "") data = Utils::Json.parse(response.data, default: {}, symbolize_names: false) Array(data["consumers"]) end |
#list_streams ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/cosmo/client.rb', line 66 def list_streams response = nc.request("$JS.API.STREAM.LIST", "") data = Utils::Json.parse(response.data, symbolize_names: false) return [] if data.nil? || data["streams"].nil? data["streams"] end |
#pause_stream(name) ⇒ Object
74 75 76 77 78 79 |
# File 'lib/cosmo/client.rb', line 74 def pause_stream(name) config = stream_info(name).config.to_h config[:metadata] ||= {} config[:metadata][:"_cosmo.paused"] = "true" update_stream(name, config) end |
#publish(subject, payload, **params) ⇒ Object
21 22 23 |
# File 'lib/cosmo/client.rb', line 21 def publish(subject, payload, **params) js.publish(subject, payload, **params) end |
#purge(stream_name, subject) ⇒ Object
113 114 115 116 117 118 119 120 |
# File 'lib/cosmo/client.rb', line 113 def purge(stream_name, subject) payload = subject ? Utils::Json.dump({ filter: subject }) : "" response = @nc.request("$JS.API.STREAM.PURGE.#{stream_name}", payload) result = Utils::Json.parse(response.data, default: {}, symbolize_names: false) raise NATS::JetStream::Error, result.dig("error", "description") if result["error"] result["purged"] # number of messages purged end |
#setup_stream(name, config) ⇒ Object
Create/update a stream, falling back to create when there’s no stream.
48 49 50 51 52 |
# File 'lib/cosmo/client.rb', line 48 def setup_stream(name, config) update_stream(name, config) rescue NATS::JetStream::Error::StreamNotFound create_stream(name, config) end |
#stream_info(name) ⇒ Object
29 30 31 |
# File 'lib/cosmo/client.rb', line 29 def stream_info(name) js.stream_info(name) end |
#stream_paused?(name) ⇒ Boolean
88 89 90 91 92 |
# File 'lib/cosmo/client.rb', line 88 def stream_paused?(name) stream_info(name).config.&.[](:"_cosmo.paused") == "true" rescue NATS::IO::Timeout false end |
#subscribe(subject, consumer_name, config) ⇒ Object
25 26 27 |
# File 'lib/cosmo/client.rb', line 25 def subscribe(subject, consumer_name, config) js.pull_subscribe(subject, consumer_name, config: config) end |
#unpause_stream(name) ⇒ Object
81 82 83 84 85 86 |
# File 'lib/cosmo/client.rb', line 81 def unpause_stream(name) config = stream_info(name).config.to_h config[:metadata] ||= {} config[:metadata].delete(:"_cosmo.paused") update_stream(name, config) end |
#update_stream(name, config) ⇒ Object
41 42 43 |
# File 'lib/cosmo/client.rb', line 41 def update_stream(name, config) js.update_stream(name: name, **config) end |