Class: Cosmo::API::KV
- Inherits:
-
Object
- Object
- Cosmo::API::KV
- Defined in:
- lib/cosmo/api/kv.rb
Instance Attribute Summary collapse
-
#kv ⇒ Object
readonly
Returns the value of attribute kv.
Instance Method Summary collapse
- #clean ⇒ Object
- #count ⇒ Object (also: #size)
- #delete(key) ⇒ Object
- #get(key) ⇒ Object
-
#initialize(name, options = nil) ⇒ KV
constructor
A new instance of KV.
- #keys(subject = nil, limit: 25) ⇒ Object
- #purge(key) ⇒ Object
-
#set(key, value, ttl: nil) ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/MethodLength.
Constructor Details
Instance Attribute Details
#kv ⇒ Object (readonly)
Returns the value of attribute kv.
6 7 8 |
# File 'lib/cosmo/api/kv.rb', line 6 def kv @kv end |
Instance Method Details
#clean ⇒ Object
73 74 75 |
# File 'lib/cosmo/api/kv.rb', line 73 def clean Client.instance.purge("KV_#{@name}", ">") end |
#count ⇒ Object Also known as: size
77 78 79 80 81 |
# File 'lib/cosmo/api/kv.rb', line 77 def count keys.size rescue NATS::KeyValue::NoKeysFoundError, NATS::JetStream::Error::NotFound 0 end |
#delete(key) ⇒ Object
52 53 54 |
# File 'lib/cosmo/api/kv.rb', line 52 def delete(key) kv.delete(key) end |
#get(key) ⇒ Object
46 47 48 49 50 |
# File 'lib/cosmo/api/kv.rb', line 46 def get(key) kv.get(key) rescue NATS::KeyValue::KeyNotFoundError # nop end |
#keys(subject = nil, limit: 25) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/cosmo/api/kv.rb', line 56 def keys(subject = nil, limit: 25) results = [] watcher = kv.watch(subject || ">", ignore_deletes: true, meta_only: true) watcher.each do |entry| break unless entry results << entry.key break if results.size >= limit end watcher.stop results end |
#purge(key) ⇒ Object
69 70 71 |
# File 'lib/cosmo/api/kv.rb', line 69 def purge(key) kv.purge(key) end |
#set(key, value, ttl: nil) ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/MethodLength
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/cosmo/api/kv.rb', line 14 def set(key, value, ttl: nil) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength return kv.put(key, value.to_s) unless ttl # Pass ttl: (seconds) to set a per-message expiry. # Raises `NATS::KeyValue::KeyWrongLastSequenceError` when the key is live. begin value = value.to_s put = lambda do |last_seq:| headers = { "Nats-Expected-Last-Subject-Sequence" => last_seq.to_s, "Nats-TTL" => "#{ttl.to_i}s" } Client.instance.js.publish("$KV.#{@name}.#{key}", value, header: headers) rescue NATS::JetStream::Error::APIError => e raise NATS::KeyValue::KeyWrongLastSequenceError, e.description if e.err_code == 10_071 raise end put.call(last_seq: 0) kv.send(:_get, key) # fetch the created entry to get its revision rescue NATS::KeyValue::KeyWrongLastSequenceError # `kv.get` converts KeyDeletedError → KeyNotFoundError, hiding tombstone info. # Use private _get instead — it raises KeyDeletedError with the entry's revision begin kv.send(:_get, key) rescue NATS::KeyValue::KeyDeletedError => e put.call(last_seq: e.entry.revision) return kv.send(:_get, key) end raise end end |