Class: Pgbus::DedupCache

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/dedup_cache.rb

Overview

Thread-safe in-memory dedup cache with TTL. Sits in front of the pgbus_processed_events table to avoid hitting the database for recently-seen (event_id, handler_class) pairs.

Inspired by LavinMQ’s built-in message deduplication with LRU + TTL.

Constant Summary collapse

DEFAULT_MAX_SIZE =
10_000
DEFAULT_TTL =

5 minutes

300

Instance Method Summary collapse

Constructor Details

#initialize(max_size: DEFAULT_MAX_SIZE, ttl: DEFAULT_TTL) ⇒ DedupCache

Returns a new instance of DedupCache.



15
16
17
18
19
20
# File 'lib/pgbus/dedup_cache.rb', line 15

def initialize(max_size: DEFAULT_MAX_SIZE, ttl: DEFAULT_TTL)
  @max_size = max_size
  @ttl = ttl
  @cache = Concurrent::Map.new
  @insertion_order = Concurrent::Array.new
end

Instance Method Details

#clear!Object



50
51
52
53
# File 'lib/pgbus/dedup_cache.rb', line 50

def clear!
  @cache.clear
  @insertion_order.clear
end

#mark!(key) ⇒ Object

Mark a key as seen. Call this after successfully claiming idempotency in the database so future lookups skip the DB.



38
39
40
41
42
43
44
# File 'lib/pgbus/dedup_cache.rb', line 38

def mark!(key)
  already_present = @cache.key?(key)
  evict_oldest if !already_present && @cache.size >= @max_size

  @cache[key] = monotonic_now
  @insertion_order << key unless already_present
end

#seen?(key) ⇒ Boolean

Returns true if this key was already seen (duplicate). Returns false if it’s new (first time seen — caller should proceed).

Returns:

  • (Boolean)


24
25
26
27
28
29
30
31
32
33
34
# File 'lib/pgbus/dedup_cache.rb', line 24

def seen?(key)
  entry = @cache[key]
  return false unless entry

  if expired?(entry)
    evict(key)
    return false
  end

  true
end

#sizeObject



46
47
48
# File 'lib/pgbus/dedup_cache.rb', line 46

def size
  @cache.size
end