Class: Magick::Adapters::Registry
- Inherits:
-
Object
- Object
- Magick::Adapters::Registry
- Defined in:
- lib/magick/adapters/registry.rb
Constant Summary collapse
- CACHE_INVALIDATION_CHANNEL =
'magick:cache:invalidate'- LOCAL_WRITE_TTL =
seconds to ignore self-invalidation after a local write
2.0- FEATURE_NAME_PATTERN =
Accept only conservative feature identifiers coming off the wire. Anything outside this alphabet (newlines, spaces, Unicode punctuation, 200-char garbage) is a sign of a malformed or malicious publisher and must not be fed back into Magick.features / feature.reload.
/\A[a-zA-Z0-9_\-.:]{1,120}\z/.freeze
Instance Method Summary collapse
- #all_features ⇒ Object
- #delete(feature_name) ⇒ Object
-
#ensure_subscriber! ⇒ Object
Restart the Pub/Sub subscriber after a fork.
- #exists?(feature_name) ⇒ Boolean
- #get(feature_name, key) ⇒ Object
-
#get_all_data(feature_name) ⇒ Object
Load all keys for a single feature in one call instead of N separate get() calls.
-
#initialize(memory_adapter, redis_adapter = nil, active_record_adapter: nil, circuit_breaker: nil, async: false, primary: nil) ⇒ Registry
constructor
A new instance of Registry.
-
#invalidate_cache(feature_name) ⇒ Object
Explicitly trigger cache invalidation for a feature This is useful for targeting updates that need immediate cache invalidation Invalidates memory cache in current process AND publishes to Redis for other processes.
-
#preload! ⇒ Object
Bulk load ALL features into memory cache in minimal queries.
-
#publish_cache_invalidation(feature_name) ⇒ Object
Publish cache invalidation message to Redis Pub/Sub (without deleting local memory cache) This is useful when you’ve just updated the cache and want to notify other processes but keep the local memory cache intact.
-
#redis_available? ⇒ Boolean
Check if Redis adapter is available.
-
#redis_client ⇒ Object
Get Redis client (public method for use by other classes).
- #set(feature_name, key, value) ⇒ Object
-
#set_all_data(feature_name, data_hash) ⇒ Object
Bulk set multiple keys for a feature in one call (1 query instead of N).
-
#shutdown(timeout: 5) ⇒ Object
Gracefully terminate the Pub/Sub subscriber thread and its Redis connection.
- #stopping? ⇒ Boolean
Constructor Details
#initialize(memory_adapter, redis_adapter = nil, active_record_adapter: nil, circuit_breaker: nil, async: false, primary: nil) ⇒ Registry
Returns a new instance of Registry.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/magick/adapters/registry.rb', line 16 def initialize(memory_adapter, redis_adapter = nil, active_record_adapter: nil, circuit_breaker: nil, async: false, primary: nil) @memory_adapter = memory_adapter @redis_adapter = redis_adapter @active_record_adapter = active_record_adapter @circuit_breaker = circuit_breaker || Magick::CircuitBreaker.new @async = async @primary = primary || :memory # :memory, :redis, or :active_record @subscriber_thread = nil @subscriber = nil @refresh_thread = nil @last_reload_times = {} # Track last reload time per feature for debouncing @local_writes = {} # Track recent local writes to skip self-invalidation @reload_mutex = Mutex.new @stopping = false @shutdown_mutex = Mutex.new @owner_pid = Process.pid # Only start Pub/Sub subscriber if Redis is available # In memory-only mode, each process has isolated cache (no cross-process invalidation) start_cache_invalidation_subscriber if redis_adapter end |
Instance Method Details
#all_features ⇒ Object
175 176 177 178 179 180 181 |
# File 'lib/magick/adapters/registry.rb', line 175 def all_features features = [] features += memory_adapter.all_features if memory_adapter features += redis_adapter.all_features if redis_adapter features += active_record_adapter.all_features if active_record_adapter features.uniq end |
#delete(feature_name) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/magick/adapters/registry.rb', line 145 def delete(feature_name) memory_adapter&.delete(feature_name) if redis_adapter begin redis_adapter.delete(feature_name) # Publish cache invalidation message publish_cache_invalidation(feature_name) rescue AdapterError # Continue even if Redis fails end end return unless active_record_adapter begin active_record_adapter.delete(feature_name) rescue AdapterError # Continue even if Active Record fails end end |
#ensure_subscriber! ⇒ Object
Restart the Pub/Sub subscriber after a fork. The subscriber thread is not carried into child processes, so a worker inheriting a stale reference must re-create its own subscription. Safe to call on every request; it only does work when Process.pid changes.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/magick/adapters/registry.rb', line 42 def ensure_subscriber! return if @owner_pid == Process.pid @shutdown_mutex.synchronize do return if @owner_pid == Process.pid @subscriber_thread = nil @subscriber = nil @owner_pid = Process.pid @stopping = false end start_cache_invalidation_subscriber if redis_adapter end |
#exists?(feature_name) ⇒ Boolean
167 168 169 170 171 172 173 |
# File 'lib/magick/adapters/registry.rb', line 167 def exists?(feature_name) return true if memory_adapter&.exists?(feature_name) return true if redis_adapter&.exists?(feature_name) == true return true if active_record_adapter&.exists?(feature_name) == true false end |
#get(feature_name, key) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/magick/adapters/registry.rb', line 78 def get(feature_name, key) # Try memory first (fastest) - no Redis calls needed thanks to Pub/Sub invalidation value = memory_adapter.get(feature_name, key) if memory_adapter return value unless value.nil? # Fall back to Redis if available if redis_adapter begin value = redis_adapter.get(feature_name, key) if !value.nil? && memory_adapter memory_adapter.set(feature_name, key, value) return value end rescue StandardError, AdapterError # Redis failed, continue to next adapter end end # Fall back to Active Record if available if active_record_adapter begin value = active_record_adapter.get(feature_name, key) memory_adapter.set(feature_name, key, value) if !value.nil? && memory_adapter return value rescue StandardError, AdapterError nil end end nil end |
#get_all_data(feature_name) ⇒ Object
Load all keys for a single feature in one call instead of N separate get() calls
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/magick/adapters/registry.rb', line 184 def get_all_data(feature_name) # Try memory first if memory_adapter data = memory_adapter.get_all_data(feature_name) return data unless data.nil? || data.empty? end # Fall back to Redis if redis_adapter begin data = redis_adapter.get_all_data(feature_name) if data && !data.empty? memory_adapter.set_all_data(feature_name, data) if memory_adapter return data end rescue StandardError, AdapterError # Redis failed, continue end end # Fall back to Active Record if active_record_adapter begin data = active_record_adapter.get_all_data(feature_name) if data && !data.empty? memory_adapter.set_all_data(feature_name, data) if memory_adapter return data end rescue StandardError, AdapterError # AR failed end end {} end |
#invalidate_cache(feature_name) ⇒ Object
Explicitly trigger cache invalidation for a feature This is useful for targeting updates that need immediate cache invalidation Invalidates memory cache in current process AND publishes to Redis for other processes
293 294 295 296 297 298 299 |
# File 'lib/magick/adapters/registry.rb', line 293 def invalidate_cache(feature_name) # Invalidate memory cache in current process immediately memory_adapter&.delete(feature_name) # Publish to Redis Pub/Sub to invalidate cache in other processes publish_cache_invalidation(feature_name) end |
#preload! ⇒ Object
Bulk load ALL features into memory cache in minimal queries. Call this after configuration to warm the cache.
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/magick/adapters/registry.rb', line 222 def preload! all_data = {} # Load from ActiveRecord first (source of truth for persistence) if active_record_adapter begin all_data = active_record_adapter.load_all_features_data rescue StandardError, AdapterError # AR failed, try Redis end end # Merge/override with Redis data (more up-to-date than AR in most setups) if redis_adapter begin redis_data = redis_adapter.load_all_features_data redis_data.each do |feature_name, data| all_data[feature_name] ||= {} all_data[feature_name].merge!(data) end rescue StandardError, AdapterError # Redis failed, use what we have from AR end end # Populate memory cache in bulk if memory_adapter && !all_data.empty? all_data.each do |feature_name, data| memory_adapter.set_all_data(feature_name, data) end end all_data end |
#publish_cache_invalidation(feature_name) ⇒ Object
Publish cache invalidation message to Redis Pub/Sub (without deleting local memory cache) This is useful when you’ve just updated the cache and want to notify other processes but keep the local memory cache intact
316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/magick/adapters/registry.rb', line 316 def publish_cache_invalidation(feature_name) return unless redis_adapter begin redis_client = redis_adapter.client redis_client&.publish(CACHE_INVALIDATION_CHANNEL, feature_name.to_s) rescue StandardError => e # Silently fail - cache invalidation is best effort warn "Failed to publish cache invalidation: #{e.}" if defined?(Rails) && Rails.env.development? end end |
#redis_available? ⇒ Boolean
Check if Redis adapter is available
302 303 304 |
# File 'lib/magick/adapters/registry.rb', line 302 def redis_available? !redis_adapter.nil? end |
#redis_client ⇒ Object
Get Redis client (public method for use by other classes)
307 308 309 310 311 |
# File 'lib/magick/adapters/registry.rb', line 307 def redis_client return nil unless redis_adapter redis_adapter.client end |
#set(feature_name, key, value) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/magick/adapters/registry.rb', line 110 def set(feature_name, key, value) # Update memory first (always synchronous) memory_adapter&.set(feature_name, key, value) # Record local write so the subscriber skips self-invalidation record_local_write(feature_name) # Update Redis if available if redis_adapter update_redis = proc do circuit_breaker.call do redis_adapter.set(feature_name, key, value) end rescue AdapterError => e warn "Failed to update Redis: #{e.}" if defined?(Rails) && Rails.env.development? end if @async && defined?(Thread) spawn_async_write(feature_name, update_redis) else update_redis.call publish_cache_invalidation(feature_name) end end # Always update Active Record if available (as fallback/persistence layer) return unless active_record_adapter begin active_record_adapter.set(feature_name, key, value) rescue AdapterError => e warn "Failed to update Active Record: #{e.}" if defined?(Rails) && Rails.env.development? end end |
#set_all_data(feature_name, data_hash) ⇒ Object
Bulk set multiple keys for a feature in one call (1 query instead of N)
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/magick/adapters/registry.rb', line 258 def set_all_data(feature_name, data_hash) memory_adapter&.set_all_data(feature_name, data_hash) # Record local write so the subscriber skips self-invalidation record_local_write(feature_name) if redis_adapter update_redis = proc do circuit_breaker.call do redis_adapter.set_all_data(feature_name, data_hash) end rescue AdapterError => e warn "Failed to bulk update Redis: #{e.}" if defined?(Rails) && Rails.env.development? end if @async && defined?(Thread) spawn_async_write(feature_name, update_redis) else update_redis.call publish_cache_invalidation(feature_name) end end if active_record_adapter begin active_record_adapter.set_all_data(feature_name, data_hash) rescue AdapterError => e warn "Failed to bulk update Active Record: #{e.}" if defined?(Rails) && Rails.env.development? end end end |
#shutdown(timeout: 5) ⇒ Object
Gracefully terminate the Pub/Sub subscriber thread and its Redis connection. Without this, Ruby/Puma shutdown waits on the blocking ‘subscribe` call.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/magick/adapters/registry.rb', line 59 def shutdown(timeout: 5) @shutdown_mutex.synchronize do return if @stopping @stopping = true end close_subscriber_connection(@subscriber) terminate_subscriber_thread(@subscriber_thread, timeout) @subscriber = nil @subscriber_thread = nil true end |
#stopping? ⇒ Boolean
74 75 76 |
# File 'lib/magick/adapters/registry.rb', line 74 def stopping? @stopping == true end |