Class: Pgbus::DedupCache
- Inherits:
-
Object
- Object
- Pgbus::DedupCache
- Defined in:
- lib/pgbus/dedup_cache.rb
Overview
Thread-safe in-memory dedup cache with TTL. Sits in front of the pgbus_processed_events table to avoid hitting the database for recently-seen (event_id, handler_class) pairs.
Inspired by LavinMQ’s built-in message deduplication with LRU + TTL.
Constant Summary collapse
- DEFAULT_MAX_SIZE =
10_000- DEFAULT_TTL =
5 minutes
300
Instance Method Summary collapse
- #clear! ⇒ Object
-
#initialize(max_size: DEFAULT_MAX_SIZE, ttl: DEFAULT_TTL) ⇒ DedupCache
constructor
A new instance of DedupCache.
-
#mark!(key) ⇒ Object
Mark a key as seen.
-
#seen?(key) ⇒ Boolean
Returns true if this key was already seen (duplicate).
- #size ⇒ Object
Constructor Details
#initialize(max_size: DEFAULT_MAX_SIZE, ttl: DEFAULT_TTL) ⇒ DedupCache
Returns a new instance of DedupCache.
15 16 17 18 19 20 |
# File 'lib/pgbus/dedup_cache.rb', line 15 def initialize(max_size: DEFAULT_MAX_SIZE, ttl: DEFAULT_TTL) @max_size = max_size @ttl = ttl @cache = Concurrent::Map.new @insertion_order = Concurrent::Array.new end |
Instance Method Details
#clear! ⇒ Object
50 51 52 53 |
# File 'lib/pgbus/dedup_cache.rb', line 50 def clear! @cache.clear @insertion_order.clear end |
#mark!(key) ⇒ Object
Mark a key as seen. Call this after successfully claiming idempotency in the database so future lookups skip the DB.
38 39 40 41 42 43 44 |
# File 'lib/pgbus/dedup_cache.rb', line 38 def mark!(key) already_present = @cache.key?(key) evict_oldest if !already_present && @cache.size >= @max_size @cache[key] = monotonic_now @insertion_order << key unless already_present end |
#seen?(key) ⇒ Boolean
Returns true if this key was already seen (duplicate). Returns false if it’s new (first time seen — caller should proceed).
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/pgbus/dedup_cache.rb', line 24 def seen?(key) entry = @cache[key] return false unless entry if expired?(entry) evict(key) return false end true end |
#size ⇒ Object
46 47 48 |
# File 'lib/pgbus/dedup_cache.rb', line 46 def size @cache.size end |