Class: EventMeter::Stores::Rollup::Redis

Inherits:
Object
  • Object
show all
Includes:
CleanupHelpers, Namespace, EventMeter::Stores::RedisLock
Defined in:
lib/event_meter/stores/rollup/redis.rb

Constant Summary collapse

DEFAULT_ROLLUP_TTL =
31 * 24 * 60 * 60
MIN_FIELD_SCRIPT =
<<~LUA
  local current = redis.call("hget", KEYS[1], ARGV[1])
  local current_number = tonumber(current)
  local value = tonumber(ARGV[2])

  if value == nil then
    return redis.error_reply("ERR event_meter rollup min value must be numeric")
  end

  if current == false or current_number == nil or current_number > value then
    redis.call("hset", KEYS[1], ARGV[1], ARGV[2])
  end

  return 1
LUA
MAX_FIELD_SCRIPT =
<<~LUA
  local current = redis.call("hget", KEYS[1], ARGV[1])
  local current_number = tonumber(current)
  local value = tonumber(ARGV[2])

  if value == nil then
    return redis.error_reply("ERR event_meter rollup max value must be numeric")
  end

  if current == false or current_number == nil or current_number < value then
    redis.call("hset", KEYS[1], ARGV[1], ARGV[2])
  end

  return 1
LUA
SET_MAX_SCRIPT =
<<~LUA
  local current = redis.call("get", KEYS[1])
  local current_number = tonumber(current)
  local value = tonumber(ARGV[1])

  if value == nil then
    return redis.error_reply("ERR event_meter state value must be numeric")
  end

  if current == false or current_number == nil or current_number < value then
    redis.call("set", KEYS[1], ARGV[1])
  end

  return 1
LUA

Constants included from EventMeter::Stores::RedisLock

EventMeter::Stores::RedisLock::LOCK_REFRESH_RATIO

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis:, namespace:, lock_redis: nil, report_name: nil, version: nil, lock_scope: nil, rollup_ttl: DEFAULT_ROLLUP_TTL) ⇒ Redis

Returns a new instance of Redis.



68
69
70
71
72
73
74
75
76
# File 'lib/event_meter/stores/rollup/redis.rb', line 68

def initialize(redis:, namespace:, lock_redis: nil, report_name: nil, version: nil, lock_scope: nil, rollup_ttl: DEFAULT_ROLLUP_TTL)
  @redis = redis
  @lock_redis = lock_redis || redis
  @namespace = normalize_namespace(namespace)
  @report_name = report_name&.to_s
  @version = version&.to_i
  @lock_scope = lock_scope
  @rollup_ttl = positive_integer(rollup_ttl, "rollup_ttl")
end

Instance Attribute Details

#lock_redisObject (readonly)

Returns the value of attribute lock_redis.



66
67
68
# File 'lib/event_meter/stores/rollup/redis.rb', line 66

def lock_redis
  @lock_redis
end

#lock_scopeObject (readonly)

Returns the value of attribute lock_scope.



66
67
68
# File 'lib/event_meter/stores/rollup/redis.rb', line 66

def lock_scope
  @lock_scope
end

#namespaceObject (readonly)

Returns the value of attribute namespace.



66
67
68
# File 'lib/event_meter/stores/rollup/redis.rb', line 66

def namespace
  @namespace
end

#redisObject (readonly)

Returns the value of attribute redis.



66
67
68
# File 'lib/event_meter/stores/rollup/redis.rb', line 66

def redis
  @redis
end

#report_nameObject (readonly)

Returns the value of attribute report_name.



66
67
68
# File 'lib/event_meter/stores/rollup/redis.rb', line 66

def report_name
  @report_name
end

#rollup_ttlObject (readonly)

Returns the value of attribute rollup_ttl.



66
67
68
# File 'lib/event_meter/stores/rollup/redis.rb', line 66

def rollup_ttl
  @rollup_ttl
end

#versionObject (readonly)

Returns the value of attribute version.



66
67
68
# File 'lib/event_meter/stores/rollup/redis.rb', line 66

def version
  @version
end

Instance Method Details

#apply(batch) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/event_meter/stores/rollup/redis.rb', line 130

def apply(batch)
  ensure_scoped!

  redis.multi do |transaction|
    batch.rollups.each { |key, rollup| apply_rollup(transaction, key, rollup) }

    batch.state_updates.each do |key, value|
      transaction.eval(SET_MAX_SCRIPT, keys: [key], argv: [value])
      transaction.expire(key, rollup_ttl)
    end

    processed_at = Time.now.utc.iso8601(6)
    batch.entry_ids.each do |id|
      key = processed_key(id)
      transaction.set(key, processed_at)
      transaction.expire(key, rollup_ttl)
    end
  end
end

#cleanup_history(before:, events:, interval_state:) ⇒ Object



188
189
190
191
192
193
194
195
196
# File 'lib/event_meter/stores/rollup/redis.rb', line 188

def cleanup_history(before:, events:, interval_state:)
  filter = event_filter(events)

  {
    rollup_keys_deleted: cleanup_rollups(before, filter),
    interval_state_keys_deleted: interval_state ? cleanup_interval_state(before, filter) : 0,
    processed_entries_deleted: cleanup_processed_entries(before, filter)
  }
end

#cleanup_watermark(key) ⇒ Object



176
177
178
# File 'lib/event_meter/stores/rollup/redis.rb', line 176

def cleanup_watermark(key)
  redis.get(key)
end

#ensure_definition(definition) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/event_meter/stores/rollup/redis.rb', line 93

def ensure_definition(definition)
  key = definition_key(definition.name, definition.version)
  payload = JSON.generate(definition.to_h)
  stored = redis.get(key)

  if stored
    ensure_same_definition!(stored, definition)
  else
    redis.set(key, payload, nx: true)
    ensure_same_definition!(redis.get(key), definition)
  end
end

#for_report(name:, version:) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/event_meter/stores/rollup/redis.rb', line 78

def for_report(name:, version:)
  name = name.to_s
  version = version.to_i

  self.class.new(
    redis: redis,
    lock_redis: lock_redis,
    namespace: namespace,
    report_name: name,
    version: version,
    lock_scope: "#{Keys.event_name(name)}:#{Keys.version_key(version)}",
    rollup_ttl: rollup_ttl
  )
end

#forget_processed_ids(ids) ⇒ Object



124
125
126
127
128
# File 'lib/event_meter/stores/rollup/redis.rb', line 124

def forget_processed_ids(ids)
  ensure_scoped!
  keys = ids.map { |id| processed_key(id) }
  delete_keys(keys)
end

#get(key) ⇒ Object



172
173
174
# File 'lib/event_meter/stores/rollup/redis.rb', line 172

def get(key)
  redis.get(key)
end

#hgetall_many(keys) ⇒ Object



150
151
152
153
154
155
156
# File 'lib/event_meter/stores/rollup/redis.rb', line 150

def hgetall_many(keys)
  return [] if keys.empty?

  redis.pipelined do |pipe|
    keys.each { |key| pipe.hgetall(key) }
  end
end

#keys_matching(pattern, limit: nil) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/event_meter/stores/rollup/redis.rb', line 158

def keys_matching(pattern, limit: nil)
  limit = positive_integer(limit, "limit") if limit
  keys = []

  redis.scan_each(match: namespace_glob(pattern)) do |key|
    next unless key_matches?(key, pattern)

    keys << key
    break if limit && keys.length >= limit
  end

  keys.sort
end

#processed_ids(ids) ⇒ Object



113
114
115
116
117
118
119
120
121
122
# File 'lib/event_meter/stores/rollup/redis.rb', line 113

def processed_ids(ids)
  ensure_scoped!
  return [] if ids.empty?

  values = redis.pipelined do |pipe|
    ids.each { |id| pipe.get(processed_key(id)) }
  end

  ids.zip(values).filter_map { |id, value| id if value }
end

#report_definition(name:, version:) ⇒ Object



106
107
108
109
110
111
# File 'lib/event_meter/stores/rollup/redis.rb', line 106

def report_definition(name:, version:)
  payload = redis.get(definition_key(name, version))
  payload && JSON.parse(payload)
rescue JSON::ParserError, TypeError
  nil
end

#with_lock(ttl:) ⇒ Object



184
185
186
# File 'lib/event_meter/stores/rollup/redis.rb', line 184

def with_lock(ttl:)
  with_redis_lock(lock_key, ttl: ttl) { yield }
end

#write_cleanup_watermark(key, value) ⇒ Object



180
181
182
# File 'lib/event_meter/stores/rollup/redis.rb', line 180

def write_cleanup_watermark(key, value)
  redis.set(key, value)
end