Class: ActiveJob::Temporal::RateLimiters::MemoryBucketStore

Inherits:
Object
  • Object
show all
Defined in:
lib/activejob/temporal/rate_limiters/memory.rb

Defined Under Namespace

Classes: Bucket

Constant Summary collapse

SWEEP_INTERVAL =
1.0

Instance Method Summary collapse

Constructor Details

#initializeMemoryBucketStore

Returns a new instance of MemoryBucketStore.



14
15
16
17
18
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 14

def initialize
  @buckets_by_key = Concurrent::Map.new
  @buckets_mutex = Mutex.new
  @next_sweep_at = nil
end

Instance Method Details

#acquire(keys, now) ⇒ Object



20
21
22
23
24
25
26
27
28
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 20

def acquire(keys, now)
  @buckets_mutex.synchronize do
    keys.uniq.sort.map do |key|
      bucket = bucket_for(key, now)
      bucket.references += 1
      [key, bucket]
    end
  end
end

#prune_expired_timestamps(bucket, interval, now) ⇒ Object



50
51
52
53
54
55
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 50

def prune_expired_timestamps(bucket, interval, now)
  cutoff = now - interval
  timestamps = bucket.timestamps
  timestamps.shift while timestamps.first && timestamps.first <= cutoff
  timestamps
end

#release(bucket_entries, now) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 30

def release(bucket_entries, now)
  @buckets_mutex.synchronize do
    bucket_entries.each do |key, bucket|
      bucket.references -= 1
      @buckets_by_key.delete_pair(key, bucket) if evictable_bucket?(key, bucket, now)
    end
  end
end

#sweep_if_due(now) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 57

def sweep_if_due(now)
  return unless sweep_due?(now)

  bucket_entries = acquire_sweep_bucket_entries
  synchronize(bucket_entries) do
    bucket_entries.each do |key, bucket|
      prune_expired_timestamps(bucket, key.fetch(1), now)
    end
  end
ensure
  release(bucket_entries, now) if bucket_entries
end

#synchronize(bucket_entries) ⇒ Object



39
40
41
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 39

def synchronize(bucket_entries, &)
  synchronize_buckets(bucket_entries.map(&:last), &)
end

#touch(bucket_entries, now) ⇒ Object



43
44
45
46
47
48
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 43

def touch(bucket_entries, now)
  bucket_entries.each do |entry|
    bucket = entry.fetch(1)
    bucket.last_touched_at = now unless bucket.timestamps.empty?
  end
end