Class: Smith::PersistenceAdapters::RedisStore
- Inherits:
-
Object
- Object
- Smith::PersistenceAdapters::RedisStore
- Defined in:
- lib/smith/persistence_adapters/redis_store.rb
Constant Summary collapse
- TRANSIENT_ERROR_NAMES =
Redis transient errors — narrow list; non-transient errors (CommandError, etc.) propagate up immediately. Pattern matches Redis::BaseConnectionError if loaded (covers Connection/Timeout) via class-name guard so Smith doesn’t require redis at load time.
%w[ Redis::BaseConnectionError Redis::TimeoutError Redis::CannotConnectError Redis::ConnectionError ].freeze
Class Method Summary collapse
Instance Method Summary collapse
- #delete(key) ⇒ Object
- #fetch(key) ⇒ Object
-
#initialize(redis:, namespace: "smith") ⇒ RedisStore
constructor
A new instance of RedisStore.
- #last_heartbeat(key) ⇒ Object
- #record_heartbeat(key, ttl: Smith.config.persistence_ttl) ⇒ Object
- #store(key, payload, ttl: Smith.config.persistence_ttl) ⇒ Object
-
#store_versioned(key, payload, expected_version:, ttl: Smith.config.persistence_ttl) ⇒ Object
Optimistic locking via Redis WATCH/MULTI/EXEC.
Constructor Details
#initialize(redis:, namespace: "smith") ⇒ RedisStore
Returns a new instance of RedisStore.
27 28 29 30 |
# File 'lib/smith/persistence_adapters/redis_store.rb', line 27 def initialize(redis:, namespace: "smith") @redis_source = redis @namespace = namespace end |
Class Method Details
.transient_errors ⇒ Object
19 20 21 22 23 24 25 |
# File 'lib/smith/persistence_adapters/redis_store.rb', line 19 def self.transient_errors TRANSIENT_ERROR_NAMES.filter_map do |name| Object.const_get(name) rescue NameError nil end + [Errno::ECONNREFUSED, Errno::ETIMEDOUT, Errno::EPIPE] end |
Instance Method Details
#delete(key) ⇒ Object
48 49 50 51 52 |
# File 'lib/smith/persistence_adapters/redis_store.rb', line 48 def delete(key) Retry.with_retries(operation: :delete, transient: self.class.transient_errors) do client.del(namespaced(key), namespaced_heartbeat(key)) end end |
#fetch(key) ⇒ Object
42 43 44 45 46 |
# File 'lib/smith/persistence_adapters/redis_store.rb', line 42 def fetch(key) Retry.with_retries(operation: :fetch, transient: self.class.transient_errors) do client.get(namespaced(key)) end end |
#last_heartbeat(key) ⇒ Object
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/smith/persistence_adapters/redis_store.rb', line 65 def last_heartbeat(key) Retry.with_retries(operation: :last_heartbeat, transient: self.class.transient_errors) do raw = client.get(namespaced_heartbeat(key)) next nil if raw.nil? Time.parse(raw).utc rescue ArgumentError nil end end |
#record_heartbeat(key, ttl: Smith.config.persistence_ttl) ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/smith/persistence_adapters/redis_store.rb', line 54 def record_heartbeat(key, ttl: Smith.config.persistence_ttl) Retry.with_retries(operation: :record_heartbeat, transient: self.class.transient_errors) do iso = Time.now.utc.iso8601 if ttl client.set(namespaced_heartbeat(key), iso, ex: ttl) else client.set(namespaced_heartbeat(key), iso) end end end |
#store(key, payload, ttl: Smith.config.persistence_ttl) ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/smith/persistence_adapters/redis_store.rb', line 32 def store(key, payload, ttl: Smith.config.persistence_ttl) Retry.with_retries(operation: :store, transient: self.class.transient_errors) do if ttl client.set(namespaced(key), payload, ex: ttl) else client.set(namespaced(key), payload) end end end |
#store_versioned(key, payload, expected_version:, ttl: Smith.config.persistence_ttl) ⇒ Object
Optimistic locking via Redis WATCH/MULTI/EXEC. Raises Smith::PersistenceVersionConflict on a stale expected_version OR on EXEC failure (WATCH detected concurrent write).
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/smith/persistence_adapters/redis_store.rb', line 79 def store_versioned(key, payload, expected_version:, ttl: Smith.config.persistence_ttl) Retry.with_retries(operation: :store_versioned, transient: self.class.transient_errors) do namespaced_key = namespaced(key) result = client.watch(namespaced_key) do current = client.get(namespaced_key) if current && (current_version = parse_version(current)) != expected_version client.unwatch raise Smith::PersistenceVersionConflict.new( key: key, expected: expected_version, actual: current_version ) end client.multi do |tx| if ttl tx.set(namespaced_key, payload, ex: ttl) else tx.set(namespaced_key, payload) end end end if result.nil? raise Smith::PersistenceVersionConflict.new( key: key, expected: expected_version, actual: :concurrent ) end result end end |