Class: EventMeter::Stores::Stream::Redis

Inherits:
Object
  • Object
show all
Includes:
Namespace, RedisLock
Defined in:
lib/event_meter/stores/stream/redis.rb

Constant Summary

Constants included from RedisLock

RedisLock::LOCK_REFRESH_RATIO

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis:, namespace:, lock_redis: nil, redis_read_limit: nil) ⇒ Redis

Returns a new instance of Redis.



15
16
17
18
19
20
21
# File 'lib/event_meter/stores/stream/redis.rb', line 15

def initialize(redis:, namespace:, lock_redis: nil, redis_read_limit: nil)
  @redis = redis
  @lock_redis = lock_redis || redis
  @namespace = normalize_namespace(namespace)
  @redis_read_limit = normalize_redis_read_limit(redis_read_limit)
  @read_ids = []
end

Instance Attribute Details

#lock_redisObject (readonly)

Returns the value of attribute lock_redis.



13
14
15
# File 'lib/event_meter/stores/stream/redis.rb', line 13

def lock_redis
  @lock_redis
end

#namespaceObject (readonly)

Returns the value of attribute namespace.



13
14
15
# File 'lib/event_meter/stores/stream/redis.rb', line 13

def namespace
  @namespace
end

#redisObject (readonly)

Returns the value of attribute redis.



13
14
15
# File 'lib/event_meter/stores/stream/redis.rb', line 13

def redis
  @redis
end

#redis_read_limitObject (readonly)

Returns the value of attribute redis_read_limit.



13
14
15
# File 'lib/event_meter/stores/stream/redis.rb', line 13

def redis_read_limit
  @redis_read_limit
end

Instance Method Details

#append(payload) ⇒ Object



23
24
25
26
# File 'lib/event_meter/stores/stream/redis.rb', line 23

def append(payload)
  hash = payload.to_h
  redis.xadd(stream_key(hash.fetch("name")), { "payload" => hash.to_json })
end

#deleteObject



39
40
41
42
43
44
# File 'lib/event_meter/stores/stream/redis.rb', line 39

def delete
  redis.xdel(@read_key, *@read_ids) if @read_key && !@read_ids.empty?
ensure
  @read_ids = []
  @read_key = nil
end

#read(name:) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/event_meter/stores/stream/redis.rb', line 28

def read(name:)
  range_options = redis_read_limit ? { count: redis_read_limit } : {}
  @read_key = stream_key(name)
  rows = redis.xrange(@read_key, "-", "+", **range_options)
  @read_ids = rows.map(&:first)

  rows.map do |id, fields|
    [id, parse_payload(fields)]
  end
end

#with_lock(ttl:) ⇒ Object



46
47
48
# File 'lib/event_meter/stores/stream/redis.rb', line 46

def with_lock(ttl:)
  with_redis_lock(lock_key, ttl: ttl) { yield }
end