Class: EventMeter::Stores::Rollup::Redis
Constant Summary
collapse
- DEFAULT_ROLLUP_TTL =
31 * 24 * 60 * 60
- MIN_FIELD_SCRIPT =
<<~LUA
local current = redis.call("hget", KEYS[1], ARGV[1])
local current_number = tonumber(current)
local value = tonumber(ARGV[2])
if value == nil then
return redis.error_reply("ERR event_meter rollup min value must be numeric")
end
if current == false or current_number == nil or current_number > value then
redis.call("hset", KEYS[1], ARGV[1], ARGV[2])
end
return 1
LUA
- MAX_FIELD_SCRIPT =
<<~LUA
local current = redis.call("hget", KEYS[1], ARGV[1])
local current_number = tonumber(current)
local value = tonumber(ARGV[2])
if value == nil then
return redis.error_reply("ERR event_meter rollup max value must be numeric")
end
if current == false or current_number == nil or current_number < value then
redis.call("hset", KEYS[1], ARGV[1], ARGV[2])
end
return 1
LUA
- SET_MAX_SCRIPT =
<<~LUA
local current = redis.call("get", KEYS[1])
local current_number = tonumber(current)
local value = tonumber(ARGV[1])
if value == nil then
return redis.error_reply("ERR event_meter state value must be numeric")
end
if current == false or current_number == nil or current_number < value then
redis.call("set", KEYS[1], ARGV[1])
end
return 1
LUA
EventMeter::Stores::RedisLock::LOCK_REFRESH_RATIO
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#apply(batch) ⇒ Object
-
#cleanup_history(before:, events:, interval_state:) ⇒ Object
-
#cleanup_watermark(key) ⇒ Object
-
#ensure_definition(definition) ⇒ Object
-
#for_report(name:, version:) ⇒ Object
-
#forget_processed_ids(ids) ⇒ Object
-
#get(key) ⇒ Object
-
#hgetall_many(keys) ⇒ Object
-
#initialize(redis:, namespace:, lock_redis: nil, report_name: nil, version: nil, lock_scope: nil, rollup_ttl: DEFAULT_ROLLUP_TTL) ⇒ Redis
constructor
-
#keys_matching(pattern, limit: nil) ⇒ Object
-
#processed_ids(ids) ⇒ Object
-
#report_definition(name:, version:) ⇒ Object
-
#with_lock(ttl:) ⇒ Object
-
#write_cleanup_watermark(key, value) ⇒ Object
Constructor Details
#initialize(redis:, namespace:, lock_redis: nil, report_name: nil, version: nil, lock_scope: nil, rollup_ttl: DEFAULT_ROLLUP_TTL) ⇒ Redis
Returns a new instance of Redis.
68
69
70
71
72
73
74
75
76
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 68
def initialize(redis:, namespace:, lock_redis: nil, report_name: nil, version: nil, lock_scope: nil, rollup_ttl: DEFAULT_ROLLUP_TTL)
@redis = redis
@lock_redis = lock_redis || redis
@namespace = normalize_namespace(namespace)
@report_name = report_name&.to_s
@version = version&.to_i
@lock_scope = lock_scope
@rollup_ttl = positive_integer(rollup_ttl, "rollup_ttl")
end
|
Instance Attribute Details
#lock_redis ⇒ Object
Returns the value of attribute lock_redis.
66
67
68
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 66
def lock_redis
@lock_redis
end
|
#lock_scope ⇒ Object
Returns the value of attribute lock_scope.
66
67
68
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 66
def lock_scope
@lock_scope
end
|
#namespace ⇒ Object
Returns the value of attribute namespace.
66
67
68
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 66
def namespace
@namespace
end
|
#redis ⇒ Object
Returns the value of attribute redis.
66
67
68
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 66
def redis
@redis
end
|
#report_name ⇒ Object
Returns the value of attribute report_name.
66
67
68
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 66
def report_name
@report_name
end
|
#rollup_ttl ⇒ Object
Returns the value of attribute rollup_ttl.
66
67
68
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 66
def rollup_ttl
@rollup_ttl
end
|
#version ⇒ Object
Returns the value of attribute version.
66
67
68
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 66
def version
@version
end
|
Instance Method Details
#apply(batch) ⇒ Object
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 130
def apply(batch)
ensure_scoped!
redis.multi do |transaction|
batch.rollups.each { |key, rollup| apply_rollup(transaction, key, rollup) }
batch.state_updates.each do |key, value|
transaction.eval(SET_MAX_SCRIPT, keys: [key], argv: [value])
transaction.expire(key, rollup_ttl)
end
processed_at = Time.now.utc.iso8601(6)
batch.entry_ids.each do |id|
key = processed_key(id)
transaction.set(key, processed_at)
transaction.expire(key, rollup_ttl)
end
end
end
|
#cleanup_history(before:, events:, interval_state:) ⇒ Object
188
189
190
191
192
193
194
195
196
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 188
def cleanup_history(before:, events:, interval_state:)
filter = event_filter(events)
{
rollup_keys_deleted: cleanup_rollups(before, filter),
interval_state_keys_deleted: interval_state ? cleanup_interval_state(before, filter) : 0,
processed_entries_deleted: cleanup_processed_entries(before, filter)
}
end
|
#cleanup_watermark(key) ⇒ Object
176
177
178
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 176
def cleanup_watermark(key)
redis.get(key)
end
|
#ensure_definition(definition) ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 93
def ensure_definition(definition)
key = definition_key(definition.name, definition.version)
payload = JSON.generate(definition.to_h)
stored = redis.get(key)
if stored
ensure_same_definition!(stored, definition)
else
redis.set(key, payload, nx: true)
ensure_same_definition!(redis.get(key), definition)
end
end
|
#for_report(name:, version:) ⇒ Object
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 78
def for_report(name:, version:)
name = name.to_s
version = version.to_i
self.class.new(
redis: redis,
lock_redis: lock_redis,
namespace: namespace,
report_name: name,
version: version,
lock_scope: "#{Keys.event_name(name)}:#{Keys.version_key(version)}",
rollup_ttl: rollup_ttl
)
end
|
#forget_processed_ids(ids) ⇒ Object
124
125
126
127
128
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 124
def forget_processed_ids(ids)
ensure_scoped!
keys = ids.map { |id| processed_key(id) }
delete_keys(keys)
end
|
#get(key) ⇒ Object
172
173
174
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 172
def get(key)
redis.get(key)
end
|
#hgetall_many(keys) ⇒ Object
150
151
152
153
154
155
156
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 150
def hgetall_many(keys)
return [] if keys.empty?
redis.pipelined do |pipe|
keys.each { |key| pipe.hgetall(key) }
end
end
|
#keys_matching(pattern, limit: nil) ⇒ Object
158
159
160
161
162
163
164
165
166
167
168
169
170
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 158
def keys_matching(pattern, limit: nil)
limit = positive_integer(limit, "limit") if limit
keys = []
redis.scan_each(match: namespace_glob(pattern)) do |key|
next unless key_matches?(key, pattern)
keys << key
break if limit && keys.length >= limit
end
keys.sort
end
|
#processed_ids(ids) ⇒ Object
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 113
def processed_ids(ids)
ensure_scoped!
return [] if ids.empty?
values = redis.pipelined do |pipe|
ids.each { |id| pipe.get(processed_key(id)) }
end
ids.zip(values).filter_map { |id, value| id if value }
end
|
#report_definition(name:, version:) ⇒ Object
106
107
108
109
110
111
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 106
def report_definition(name:, version:)
payload = redis.get(definition_key(name, version))
payload && JSON.parse(payload)
rescue JSON::ParserError, TypeError
nil
end
|
#with_lock(ttl:) ⇒ Object
184
185
186
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 184
def with_lock(ttl:)
with_redis_lock(lock_key, ttl: ttl) { yield }
end
|
#write_cleanup_watermark(key, value) ⇒ Object
180
181
182
|
# File 'lib/event_meter/stores/rollup/redis.rb', line 180
def write_cleanup_watermark(key, value)
redis.set(key, value)
end
|