Class: MQKV::Store
- Inherits:
-
Object
show all
- Defined in:
- lib/mqkv/store.rb
Defined Under Namespace
Classes: CacheEntry, WatchHandle
Constant Summary
collapse
"__mqkv_deleted__"
"__mqkv_expires_at__"
Instance Method Summary
collapse
Constructor Details
#initialize(url, prefix: "mqkv", read_timeout: 0.5, confirm: true, logger: nil) ⇒ Store
Returns a new instance of Store.
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/mqkv/store.rb', line 23
def initialize(url, prefix: "mqkv", read_timeout: 0.5, confirm: true, logger: nil)
@url = url
@prefix = prefix
@read_timeout = read_timeout
@confirm = confirm
@logger = logger
@mutex = Mutex.new
@connection = nil
@declared_streams = Set.new
@cache = nil
@cache_mutex = Mutex.new
@cache_watchers = {}
end
|
Instance Method Details
#close ⇒ Object
138
139
140
141
142
143
144
145
146
|
# File 'lib/mqkv/store.rb', line 138
def close
log(:info) { "at=close" }
stop_cache_watchers
@mutex.synchronize do
@connection&.close
@connection = nil
@declared_streams.clear
end
end
|
#delete(key) ⇒ Object
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/mqkv/store.rb', line 66
def delete(key)
name = queue_name(key)
ensure_stream(name)
publish_tombstone(name)
log(:debug) { "at=delete key=#{key} queue=#{name}" }
if @cache
@cache_mutex.synchronize { @cache[key] = CacheEntry.new(value: nil, expires_at: nil) }
log(:debug) { "at=delete key=#{key} cache=tombstoned" }
start_cache_watcher(key)
end
nil
end
|
#exists?(key) ⇒ Boolean
79
80
81
|
# File 'lib/mqkv/store.rb', line 79
def exists?(key)
!get(key).nil?
end
|
#get(key) ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/mqkv/store.rb', line 53
def get(key)
if @cache
@cache_mutex.synchronize do
if @cache.key?(key)
log(:debug) { "at=get key=#{key} source=cache" }
return @cache[key].current_value
end
end
end
log(:debug) { "at=get key=#{key} source=stream" }
resolve_current(consume_stream(queue_name(key), offset: "last"))
end
|
#history(key, limit: 10) ⇒ Object
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/mqkv/store.rb', line 83
def history(key, limit: 10)
messages = consume_stream(queue_name(key), offset: "first")
values = []
messages.each do |msg|
if tombstone?(msg)
values.clear
else
values << msg.body
end
end
values.last(limit)
end
|
#preload(*keys, max_messages: 10_000) ⇒ Object
96
97
98
99
100
101
102
103
104
105
|
# File 'lib/mqkv/store.rb', line 96
def preload(*keys, max_messages: 10_000)
@cache_mutex.synchronize { @cache ||= {} }
keys.each do |key|
messages = consume_stream(queue_name(key), offset: "first", max_messages: max_messages)
entry = resolve_entry(messages)
@cache_mutex.synchronize { @cache[key] = entry }
log(:debug) { "at=preload key=#{key} messages=#{messages.size} value=#{entry.current_value.nil? ? "nil" : "present"}" }
start_cache_watcher(key)
end
end
|
#purge! ⇒ Object
126
127
128
129
130
131
132
133
134
135
136
|
# File 'lib/mqkv/store.rb', line 126
def purge!
stop_cache_watchers
conn = @mutex.synchronize { @connection }
return unless conn && !conn.closed?
streams = @mutex.synchronize { @declared_streams.to_a }
conn.with_channel do |ch|
streams.each { |name| ch.queue_delete(name) }
end
@mutex.synchronize { @declared_streams.clear }
end
|
#set(key, value, ttl: nil) ⇒ Object
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/mqkv/store.rb', line 37
def set(key, value, ttl: nil)
name = queue_name(key)
max_age = ttl ? "#{ttl.ceil}s" : nil
ensure_stream(name, max_age: max_age)
expires_at = ttl ? Process.clock_gettime(Process::CLOCK_REALTIME) + ttl : nil
= expires_at ? { EXPIRES_HEADER => expires_at } : nil
publish(name, value.to_s, headers: )
log(:debug) { "at=set key=#{key} queue=#{name}" }
if @cache
@cache_mutex.synchronize { @cache[key] = CacheEntry.new(value: value.to_s, expires_at: expires_at) }
log(:debug) { "at=set key=#{key} cache=updated" }
start_cache_watcher(key)
end
nil
end
|
#unwatch(handle) ⇒ Object
121
122
123
124
|
# File 'lib/mqkv/store.rb', line 121
def unwatch(handle)
handle.channel.basic_cancel(handle.consumer_tag)
handle.channel.close
end
|
#watch(key, &block) ⇒ Object
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/mqkv/store.rb', line 107
def watch(key, &block)
name = queue_name(key)
ensure_stream(name)
ch = connection.channel
ch.basic_qos(256)
consume_ok = ch.basic_consume(name, no_ack: false,
arguments: { "x-stream-offset" => "next" },
worker_threads: 1) do |msg|
msg.ack
block.call(msg.body) unless tombstone?(msg)
end
WatchHandle.new(channel: ch, consumer_tag: consume_ok.consumer_tag)
end
|