Class: MQKV::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/mqkv/store.rb

Defined Under Namespace

Classes: CacheEntry, WatchHandle

Constant Summary collapse

TOMBSTONE_HEADER =
"__mqkv_deleted__"
EXPIRES_HEADER =
"__mqkv_expires_at__"

Instance Method Summary collapse

Constructor Details

#initialize(url, prefix: "mqkv", read_timeout: 0.5, confirm: true, logger: nil) ⇒ Store

Returns a new instance of Store.



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/mqkv/store.rb', line 23

def initialize(url, prefix: "mqkv", read_timeout: 0.5, confirm: true, logger: nil)
  @url = url
  @prefix = prefix
  @read_timeout = read_timeout
  @confirm = confirm
  @logger = logger
  @mutex = Mutex.new
  @connection = nil
  @declared_streams = Set.new
  @cache = nil
  @cache_mutex = Mutex.new
  @cache_watchers = {}
end

Instance Method Details

#closeObject



138
139
140
141
142
143
144
145
146
# File 'lib/mqkv/store.rb', line 138

def close
  log(:info) { "at=close" }
  stop_cache_watchers
  @mutex.synchronize do
    @connection&.close
    @connection = nil
    @declared_streams.clear
  end
end

#delete(key) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/mqkv/store.rb', line 66

def delete(key)
  name = queue_name(key)
  ensure_stream(name)
  publish_tombstone(name)
  log(:debug) { "at=delete key=#{key} queue=#{name}" }
  if @cache
    @cache_mutex.synchronize { @cache[key] = CacheEntry.new(value: nil, expires_at: nil) }
    log(:debug) { "at=delete key=#{key} cache=tombstoned" }
    start_cache_watcher(key)
  end
  nil
end

#exists?(key) ⇒ Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/mqkv/store.rb', line 79

def exists?(key)
  !get(key).nil?
end

#get(key) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/mqkv/store.rb', line 53

def get(key)
  if @cache
    @cache_mutex.synchronize do
      if @cache.key?(key)
        log(:debug) { "at=get key=#{key} source=cache" }
        return @cache[key].current_value
      end
    end
  end
  log(:debug) { "at=get key=#{key} source=stream" }
  resolve_current(consume_stream(queue_name(key), offset: "last"))
end

#history(key, limit: 10) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/mqkv/store.rb', line 83

def history(key, limit: 10)
  messages = consume_stream(queue_name(key), offset: "first")
  values = []
  messages.each do |msg|
    if tombstone?(msg)
      values.clear
    else
      values << msg.body
    end
  end
  values.last(limit)
end

#preload(*keys, max_messages: 10_000) ⇒ Object



96
97
98
99
100
101
102
103
104
105
# File 'lib/mqkv/store.rb', line 96

def preload(*keys, max_messages: 10_000)
  @cache_mutex.synchronize { @cache ||= {} }
  keys.each do |key|
    messages = consume_stream(queue_name(key), offset: "first", max_messages: max_messages)
    entry = resolve_entry(messages)
    @cache_mutex.synchronize { @cache[key] = entry }
    log(:debug) { "at=preload key=#{key} messages=#{messages.size} value=#{entry.current_value.nil? ? "nil" : "present"}" }
    start_cache_watcher(key)
  end
end

#purge!Object



126
127
128
129
130
131
132
133
134
135
136
# File 'lib/mqkv/store.rb', line 126

def purge!
  stop_cache_watchers
  conn = @mutex.synchronize { @connection }
  return unless conn && !conn.closed?

  streams = @mutex.synchronize { @declared_streams.to_a }
  conn.with_channel do |ch|
    streams.each { |name| ch.queue_delete(name) }
  end
  @mutex.synchronize { @declared_streams.clear }
end

#set(key, value, ttl: nil) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/mqkv/store.rb', line 37

def set(key, value, ttl: nil)
  name = queue_name(key)
  max_age = ttl ? "#{ttl.ceil}s" : nil
  ensure_stream(name, max_age: max_age)
  expires_at = ttl ? Process.clock_gettime(Process::CLOCK_REALTIME) + ttl : nil
  headers = expires_at ? { EXPIRES_HEADER => expires_at } : nil
  publish(name, value.to_s, headers: headers)
  log(:debug) { "at=set key=#{key} queue=#{name}" }
  if @cache
    @cache_mutex.synchronize { @cache[key] = CacheEntry.new(value: value.to_s, expires_at: expires_at) }
    log(:debug) { "at=set key=#{key} cache=updated" }
    start_cache_watcher(key)
  end
  nil
end

#unwatch(handle) ⇒ Object



121
122
123
124
# File 'lib/mqkv/store.rb', line 121

def unwatch(handle)
  handle.channel.basic_cancel(handle.consumer_tag)
  handle.channel.close
end

#watch(key, &block) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/mqkv/store.rb', line 107

def watch(key, &block)
  name = queue_name(key)
  ensure_stream(name)
  ch = connection.channel
  ch.basic_qos(256)
  consume_ok = ch.basic_consume(name, no_ack: false,
                                arguments: { "x-stream-offset" => "next" },
                                worker_threads: 1) do |msg|
    msg.ack
    block.call(msg.body) unless tombstone?(msg)
  end
  WatchHandle.new(channel: ch, consumer_tag: consume_ok.consumer_tag)
end