Class: ActiveJob::Temporal::RateLimiters::MemoryBucketStore
- Inherits:
-
Object
- Object
- ActiveJob::Temporal::RateLimiters::MemoryBucketStore
- Defined in:
- lib/activejob/temporal/rate_limiters/memory.rb
Defined Under Namespace
Classes: Bucket
Constant Summary collapse
- SWEEP_INTERVAL =
1.0
Instance Method Summary collapse
- #acquire(keys, now) ⇒ Object
-
#initialize ⇒ MemoryBucketStore
constructor
A new instance of MemoryBucketStore.
- #prune_expired_timestamps(bucket, interval, now) ⇒ Object
- #release(bucket_entries, now) ⇒ Object
- #sweep_if_due(now) ⇒ Object
- #synchronize(bucket_entries) ⇒ Object
- #touch(bucket_entries, now) ⇒ Object
Constructor Details
#initialize ⇒ MemoryBucketStore
Returns a new instance of MemoryBucketStore.
14 15 16 17 18 |
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 14 def initialize @buckets_by_key = Concurrent::Map.new @buckets_mutex = Mutex.new @next_sweep_at = nil end |
Instance Method Details
#acquire(keys, now) ⇒ Object
20 21 22 23 24 25 26 27 28 |
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 20 def acquire(keys, now) @buckets_mutex.synchronize do keys.uniq.sort.map do |key| bucket = bucket_for(key, now) bucket.references += 1 [key, bucket] end end end |
#prune_expired_timestamps(bucket, interval, now) ⇒ Object
50 51 52 53 54 55 |
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 50 def (bucket, interval, now) cutoff = now - interval = bucket. .shift while .first && .first <= cutoff end |
#release(bucket_entries, now) ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 30 def release(bucket_entries, now) @buckets_mutex.synchronize do bucket_entries.each do |key, bucket| bucket.references -= 1 @buckets_by_key.delete_pair(key, bucket) if evictable_bucket?(key, bucket, now) end end end |
#sweep_if_due(now) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 57 def sweep_if_due(now) return unless sweep_due?(now) bucket_entries = acquire_sweep_bucket_entries synchronize(bucket_entries) do bucket_entries.each do |key, bucket| (bucket, key.fetch(1), now) end end ensure release(bucket_entries, now) if bucket_entries end |
#synchronize(bucket_entries) ⇒ Object
39 40 41 |
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 39 def synchronize(bucket_entries, &) synchronize_buckets(bucket_entries.map(&:last), &) end |
#touch(bucket_entries, now) ⇒ Object
43 44 45 46 47 48 |
# File 'lib/activejob/temporal/rate_limiters/memory.rb', line 43 def touch(bucket_entries, now) bucket_entries.each do |entry| bucket = entry.fetch(1) bucket.last_touched_at = now unless bucket..empty? end end |