Class: MQKV::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/mqkv/store.rb

Defined Under Namespace

Classes: CacheEntry, WatchHandle

Constant Summary collapse

TOMBSTONE_HEADER =
"__mqkv_deleted__"
EXPIRES_HEADER =
"__mqkv_expires_at__"
CONSUME_PREFETCH =

Stream consumers must ack to advance the broker’s flow-control window: with manual acks and a prefetch of N, delivery stalls after N outstanding messages. Acking every half-window keeps deliveries flowing while still batching (multiple: true).

256
CONSUME_ACK_BATCH =
CONSUME_PREFETCH / 2

Instance Method Summary collapse

Constructor Details

#initialize(url, prefix: "mqkv", read_timeout: 0.5, connect_timeout: nil, confirm: true, cache_watchers: true, logger: nil) ⇒ Store

Returns a new instance of Store.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/mqkv/store.rb', line 30

def initialize(url, prefix: "mqkv", read_timeout: 0.5, connect_timeout: nil, confirm: true, cache_watchers: true, logger: nil)
  @url = url
  @prefix = prefix
  @read_timeout = read_timeout
  @connect_timeout = connect_timeout
  @confirm = confirm
  @cache_watchers_enabled = cache_watchers
  @logger = logger
  @mutex = Mutex.new
  @connection = nil
  @declared_streams = Set.new
  @cache = nil
  @cache_mutex = Mutex.new
  @cache_watchers = {}
end

Instance Method Details

#cached?(key) ⇒ Boolean

Whether the key is present in the in-memory cache. A tombstoned key still counts as cached (cached_get returns nil for it). Returns false if no preload has happened yet (no cache exists).

Returns:

  • (Boolean)


94
95
96
97
98
# File 'lib/mqkv/store.rb', line 94

def cached?(key)
  return false unless @cache

  @cache_mutex.synchronize { @cache.key?(key) }
end

#cached_get(key) ⇒ Object

Cache-only read. Returns the cached value, or nil if the key has been tombstoned or isn’t in the cache at all. Never falls through to the stream — intended for callers that want to decouple read latency from the broker (e.g. an HTTP handler that should show a placeholder when the cache isn’t ready yet rather than block on AMQP). Use ‘cached?(key)` to distinguish “tombstoned” from “not cached”.



82
83
84
85
86
87
88
89
# File 'lib/mqkv/store.rb', line 82

def cached_get(key)
  return nil unless @cache

  @cache_mutex.synchronize do
    return @cache[key].current_value if @cache.key?(key)
  end
  nil
end

#closeObject



197
198
199
200
201
202
203
204
205
# File 'lib/mqkv/store.rb', line 197

def close
  log(:info) { "at=close" }
  stop_cache_watchers
  @mutex.synchronize do
    @connection&.close
    @connection = nil
    @declared_streams.clear
  end
end

#delete(key) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/mqkv/store.rb', line 100

def delete(key)
  name = queue_name(key)
  ensure_stream(name)
  publish_tombstone(name)
  log(:debug) { "at=delete key=#{key} queue=#{name}" }
  if @cache
    @cache_mutex.synchronize { @cache[key] = CacheEntry.new(value: nil, expires_at: nil) }
    log(:debug) { "at=delete key=#{key} cache=tombstoned" }
    start_cache_watcher(key)
  end
  nil
end

#exists?(key) ⇒ Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/mqkv/store.rb', line 113

def exists?(key)
  !get(key).nil?
end

#get(key) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/mqkv/store.rb', line 62

def get(key)
  if @cache
    @cache_mutex.synchronize do
      if @cache.key?(key)
        log(:debug) { "at=get key=#{key} source=cache" }
        return @cache[key].current_value
      end
    end
  end
  log(:debug) { "at=get key=#{key} source=stream" }
  resolve_current(consume_stream(queue_name(key), offset: "last"))
end

#history(key, limit: 10) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/mqkv/store.rb', line 117

def history(key, limit: 10)
  values = []
  consume_stream(queue_name(key), offset: "first") do |msg|
    if tombstone?(msg)
      values.clear
    else
      values << msg.body
      # Only the last `limit` values can ever be returned, so drop
      # older ones as we scan — memory stays O(limit) instead of
      # holding every body in the stream at once.
      values.shift if values.size > limit
    end
  end
  values
end

#preload(*keys, max_messages: 10_000) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/mqkv/store.rb', line 133

def preload(*keys, max_messages: 10_000)
  @cache_mutex.synchronize { @cache ||= {} }
  keys.each do |key|
    # Only the last message decides the cache entry; stream the
    # scan so a long history never sits in memory all at once.
    last = nil
    count = consume_stream(queue_name(key), offset: "first", max_messages: max_messages) do |msg|
      last = msg
    end
    entry = last ? msg_to_cache_entry(last) : CacheEntry.new(value: nil, expires_at: nil)
    @cache_mutex.synchronize { @cache[key] = entry }
    log(:debug) { "at=preload key=#{key} messages=#{count} value=#{entry.current_value.nil? ? "nil" : "present"}" }
    start_cache_watcher(key)
  end
end

#preload_latest(*keys) ⇒ Object

Like ‘preload`, but reads from `x-stream-offset: last` so only the current value is consumed (plus anything appended during the brief `read_timeout` window). Use this when keys live in streams with long histories and callers only care about the latest value — `preload`’s ‘offset: “first”` would otherwise drain up to `max_messages` of accumulated history per key on every boot.



155
156
157
158
159
160
161
162
163
164
# File 'lib/mqkv/store.rb', line 155

def preload_latest(*keys)
  @cache_mutex.synchronize { @cache ||= {} }
  keys.each do |key|
    messages = consume_stream(queue_name(key), offset: "last")
    entry = resolve_entry(messages)
    @cache_mutex.synchronize { @cache[key] = entry }
    log(:debug) { "at=preload_latest key=#{key} value=#{entry.current_value.nil? ? "nil" : "present"}" }
    start_cache_watcher(key)
  end
end

#purge!Object



185
186
187
188
189
190
191
192
193
194
195
# File 'lib/mqkv/store.rb', line 185

def purge!
  stop_cache_watchers
  conn = @mutex.synchronize { @connection }
  return unless conn && !conn.closed?

  streams = @mutex.synchronize { @declared_streams.to_a }
  conn.with_channel do |ch|
    streams.each { |name| ch.queue_delete(name) }
  end
  @mutex.synchronize { @declared_streams.clear }
end

#set(key, value, ttl: nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/mqkv/store.rb', line 46

def set(key, value, ttl: nil)
  name = queue_name(key)
  max_age = ttl ? "#{ttl.ceil}s" : nil
  ensure_stream(name, max_age: max_age)
  expires_at = ttl ? Process.clock_gettime(Process::CLOCK_REALTIME) + ttl : nil
  headers = expires_at ? { EXPIRES_HEADER => expires_at } : nil
  publish(name, value.to_s, headers: headers)
  log(:debug) { "at=set key=#{key} queue=#{name}" }
  if @cache
    @cache_mutex.synchronize { @cache[key] = CacheEntry.new(value: value.to_s, expires_at: expires_at) }
    log(:debug) { "at=set key=#{key} cache=updated" }
    start_cache_watcher(key)
  end
  nil
end

#unwatch(handle) ⇒ Object



180
181
182
183
# File 'lib/mqkv/store.rb', line 180

def unwatch(handle)
  handle.channel.basic_cancel(handle.consumer_tag)
  handle.channel.close
end

#watch(key, &block) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/mqkv/store.rb', line 166

def watch(key, &block)
  name = queue_name(key)
  ensure_stream(name)
  ch = connection.channel
  ch.basic_qos(256)
  consume_ok = ch.basic_consume(name, no_ack: false,
                                arguments: { "x-stream-offset" => "next" },
                                worker_threads: 1) do |msg|
    msg.ack
    block.call(msg.body) unless tombstone?(msg)
  end
  WatchHandle.new(channel: ch, consumer_tag: consume_ok.consumer_tag)
end