Class: MQKV::Store
- Inherits:
-
Object
show all
- Defined in:
- lib/mqkv/store.rb
Defined Under Namespace
Classes: CacheEntry, WatchHandle
Constant Summary
collapse
"__mqkv_deleted__"
"__mqkv_expires_at__"
- CONSUME_PREFETCH =
Stream consumers must ack to advance the broker’s flow-control window: with manual acks and a prefetch of N, delivery stalls after N outstanding messages. Acking every half-window keeps deliveries flowing while still batching (multiple: true).
256
- CONSUME_ACK_BATCH =
CONSUME_PREFETCH / 2
Instance Method Summary
collapse
-
#cached?(key) ⇒ Boolean
Whether the key is present in the in-memory cache.
-
#cached_get(key) ⇒ Object
-
#close ⇒ Object
-
#delete(key) ⇒ Object
-
#exists?(key) ⇒ Boolean
-
#get(key) ⇒ Object
-
#history(key, limit: 10) ⇒ Object
-
#initialize(url, prefix: "mqkv", read_timeout: 0.5, connect_timeout: nil, confirm: true, cache_watchers: true, logger: nil) ⇒ Store
constructor
-
#preload(*keys, max_messages: 10_000) ⇒ Object
-
#preload_latest(*keys) ⇒ Object
Like ‘preload`, but reads from `x-stream-offset: last` so only the current value is consumed (plus anything appended during the brief `read_timeout` window).
-
#purge! ⇒ Object
-
#set(key, value, ttl: nil) ⇒ Object
-
#unwatch(handle) ⇒ Object
-
#watch(key, &block) ⇒ Object
Constructor Details
#initialize(url, prefix: "mqkv", read_timeout: 0.5, connect_timeout: nil, confirm: true, cache_watchers: true, logger: nil) ⇒ Store
Returns a new instance of Store.
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
# File 'lib/mqkv/store.rb', line 30
def initialize(url, prefix: "mqkv", read_timeout: 0.5, connect_timeout: nil, confirm: true, cache_watchers: true, logger: nil)
@url = url
@prefix = prefix
@read_timeout = read_timeout
@connect_timeout = connect_timeout
@confirm = confirm
@cache_watchers_enabled = cache_watchers
@logger = logger
@mutex = Mutex.new
@connection = nil
@declared_streams = Set.new
@cache = nil
@cache_mutex = Mutex.new
@cache_watchers = {}
end
|
Instance Method Details
#cached?(key) ⇒ Boolean
Whether the key is present in the in-memory cache. A tombstoned key still counts as cached (cached_get returns nil for it). Returns false if no preload has happened yet (no cache exists).
94
95
96
97
98
|
# File 'lib/mqkv/store.rb', line 94
def cached?(key)
return false unless @cache
@cache_mutex.synchronize { @cache.key?(key) }
end
|
#cached_get(key) ⇒ Object
Cache-only read. Returns the cached value, or nil if the key has been tombstoned or isn’t in the cache at all. Never falls through to the stream — intended for callers that want to decouple read latency from the broker (e.g. an HTTP handler that should show a placeholder when the cache isn’t ready yet rather than block on AMQP). Use ‘cached?(key)` to distinguish “tombstoned” from “not cached”.
82
83
84
85
86
87
88
89
|
# File 'lib/mqkv/store.rb', line 82
def cached_get(key)
return nil unless @cache
@cache_mutex.synchronize do
return @cache[key].current_value if @cache.key?(key)
end
nil
end
|
#close ⇒ Object
197
198
199
200
201
202
203
204
205
|
# File 'lib/mqkv/store.rb', line 197
def close
log(:info) { "at=close" }
stop_cache_watchers
@mutex.synchronize do
@connection&.close
@connection = nil
@declared_streams.clear
end
end
|
#delete(key) ⇒ Object
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/mqkv/store.rb', line 100
def delete(key)
name = queue_name(key)
ensure_stream(name)
publish_tombstone(name)
log(:debug) { "at=delete key=#{key} queue=#{name}" }
if @cache
@cache_mutex.synchronize { @cache[key] = CacheEntry.new(value: nil, expires_at: nil) }
log(:debug) { "at=delete key=#{key} cache=tombstoned" }
start_cache_watcher(key)
end
nil
end
|
#exists?(key) ⇒ Boolean
113
114
115
|
# File 'lib/mqkv/store.rb', line 113
def exists?(key)
!get(key).nil?
end
|
#get(key) ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/mqkv/store.rb', line 62
def get(key)
if @cache
@cache_mutex.synchronize do
if @cache.key?(key)
log(:debug) { "at=get key=#{key} source=cache" }
return @cache[key].current_value
end
end
end
log(:debug) { "at=get key=#{key} source=stream" }
resolve_current(consume_stream(queue_name(key), offset: "last"))
end
|
#history(key, limit: 10) ⇒ Object
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
# File 'lib/mqkv/store.rb', line 117
def history(key, limit: 10)
values = []
consume_stream(queue_name(key), offset: "first") do |msg|
if tombstone?(msg)
values.clear
else
values << msg.body
values.shift if values.size > limit
end
end
values
end
|
#preload(*keys, max_messages: 10_000) ⇒ Object
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
# File 'lib/mqkv/store.rb', line 133
def preload(*keys, max_messages: 10_000)
@cache_mutex.synchronize { @cache ||= {} }
keys.each do |key|
last = nil
count = consume_stream(queue_name(key), offset: "first", max_messages: max_messages) do |msg|
last = msg
end
entry = last ? msg_to_cache_entry(last) : CacheEntry.new(value: nil, expires_at: nil)
@cache_mutex.synchronize { @cache[key] = entry }
log(:debug) { "at=preload key=#{key} messages=#{count} value=#{entry.current_value.nil? ? "nil" : "present"}" }
start_cache_watcher(key)
end
end
|
#preload_latest(*keys) ⇒ Object
Like ‘preload`, but reads from `x-stream-offset: last` so only the current value is consumed (plus anything appended during the brief `read_timeout` window). Use this when keys live in streams with long histories and callers only care about the latest value — `preload`’s ‘offset: “first”` would otherwise drain up to `max_messages` of accumulated history per key on every boot.
155
156
157
158
159
160
161
162
163
164
|
# File 'lib/mqkv/store.rb', line 155
def preload_latest(*keys)
@cache_mutex.synchronize { @cache ||= {} }
keys.each do |key|
messages = consume_stream(queue_name(key), offset: "last")
entry = resolve_entry(messages)
@cache_mutex.synchronize { @cache[key] = entry }
log(:debug) { "at=preload_latest key=#{key} value=#{entry.current_value.nil? ? "nil" : "present"}" }
start_cache_watcher(key)
end
end
|
#purge! ⇒ Object
185
186
187
188
189
190
191
192
193
194
195
|
# File 'lib/mqkv/store.rb', line 185
def purge!
stop_cache_watchers
conn = @mutex.synchronize { @connection }
return unless conn && !conn.closed?
streams = @mutex.synchronize { @declared_streams.to_a }
conn.with_channel do |ch|
streams.each { |name| ch.queue_delete(name) }
end
@mutex.synchronize { @declared_streams.clear }
end
|
#set(key, value, ttl: nil) ⇒ Object
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/mqkv/store.rb', line 46
def set(key, value, ttl: nil)
name = queue_name(key)
max_age = ttl ? "#{ttl.ceil}s" : nil
ensure_stream(name, max_age: max_age)
expires_at = ttl ? Process.clock_gettime(Process::CLOCK_REALTIME) + ttl : nil
= expires_at ? { EXPIRES_HEADER => expires_at } : nil
publish(name, value.to_s, headers: )
log(:debug) { "at=set key=#{key} queue=#{name}" }
if @cache
@cache_mutex.synchronize { @cache[key] = CacheEntry.new(value: value.to_s, expires_at: expires_at) }
log(:debug) { "at=set key=#{key} cache=updated" }
start_cache_watcher(key)
end
nil
end
|
#unwatch(handle) ⇒ Object
180
181
182
183
|
# File 'lib/mqkv/store.rb', line 180
def unwatch(handle)
handle.channel.basic_cancel(handle.consumer_tag)
handle.channel.close
end
|
#watch(key, &block) ⇒ Object
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/mqkv/store.rb', line 166
def watch(key, &block)
name = queue_name(key)
ensure_stream(name)
ch = connection.channel
ch.basic_qos(256)
consume_ok = ch.basic_consume(name, no_ack: false,
arguments: { "x-stream-offset" => "next" },
worker_threads: 1) do |msg|
msg.ack
block.call(msg.body) unless tombstone?(msg)
end
WatchHandle.new(channel: ch, consumer_tag: consume_ok.consumer_tag)
end
|