Class: Async::Limiter::Timing::LeakyBucket
- Inherits:
-
Object
- Object
- Async::Limiter::Timing::LeakyBucket
- Defined in:
- lib/async/limiter/timing/leaky_bucket.rb
Overview
Leaky bucket timing strategy that smooths traffic flow.
This strategy maintains a “bucket” that gradually “leaks” capacity over time, enforcing a steady output rate regardless of input bursts.
Instance Attribute Summary collapse
-
#capacity ⇒ Object
readonly
Returns the value of attribute capacity.
- #Leak rate in units per second.(rate) ⇒ Object readonly
- #Maximum bucket capacity.(bucketcapacity.) ⇒ Object readonly
-
#rate ⇒ Object
readonly
Returns the value of attribute rate.
Instance Method Summary collapse
-
#acquire(cost = 1) ⇒ Object
Record that a task was acquired (adds cost to bucket level).
-
#advance_time(seconds) ⇒ Object
Simulate time advancement for testing purposes.
-
#can_acquire?(cost = 1, current_time = Clock.now) ⇒ Boolean
Check if a task can be acquired with the given cost.
-
#initialize(rate, capacity, initial_level: 0) ⇒ LeakyBucket
constructor
Initialize a leaky bucket timing strategy.
-
#level ⇒ Object
Get current bucket level (for debugging/monitoring).
-
#level=(new_level) ⇒ Object
Set bucket level (for testing purposes).
-
#maximum_cost ⇒ Object
Maximum cost this timing strategy can support.
-
#statistics ⇒ Object
Get current timing strategy statistics.
-
#wait(mutex, deadline = nil, cost = 1) ⇒ Object
Wait for bucket to have capacity.
Constructor Details
#initialize(rate, capacity, initial_level: 0) ⇒ LeakyBucket
Initialize a leaky bucket timing strategy.
27 28 29 30 31 32 33 34 35 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 27 def initialize(rate, capacity, initial_level: 0) raise ArgumentError, "rate must be non-negative" unless rate >= 0 raise ArgumentError, "capacity must be positive" unless capacity.positive? @rate = rate @capacity = capacity @level = initial_level.to_f @last_leak = Clock.now end |
Instance Attribute Details
#capacity ⇒ Object (readonly)
Returns the value of attribute capacity.
21 22 23 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 21 def capacity @capacity end |
#Leak rate in units per second.(rate) ⇒ Object (readonly)
18 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 18 attr_reader :rate |
#Maximum bucket capacity.(bucketcapacity.) ⇒ Object (readonly)
21 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 21 attr_reader :capacity |
#rate ⇒ Object (readonly)
Returns the value of attribute rate.
18 19 20 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 18 def rate @rate end |
Instance Method Details
#acquire(cost = 1) ⇒ Object
Record that a task was acquired (adds cost to bucket level).
54 55 56 57 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 54 def acquire(cost = 1) leak_bucket @level += cost end |
#advance_time(seconds) ⇒ Object
Simulate time advancement for testing purposes.
110 111 112 113 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 110 def advance_time(seconds) @last_leak -= seconds leak_bucket end |
#can_acquire?(cost = 1, current_time = Clock.now) ⇒ Boolean
Check if a task can be acquired with the given cost.
47 48 49 50 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 47 def can_acquire?(cost = 1, current_time = Clock.now) leak_bucket(current_time) @level + cost <= @capacity end |
#level ⇒ Object
Get current bucket level (for debugging/monitoring).
96 97 98 99 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 96 def level leak_bucket @level end |
#level=(new_level) ⇒ Object
Set bucket level (for testing purposes).
103 104 105 106 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 103 def level=(new_level) @level = new_level.to_f @last_leak = Clock.now end |
#maximum_cost ⇒ Object
Maximum cost this timing strategy can support.
39 40 41 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 39 def maximum_cost @capacity end |
#statistics ⇒ Object
Get current timing strategy statistics.
117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 117 def statistics leak_bucket { name: "LeakyBucket", current_level: @level, maximum_capacity: @capacity, leak_rate: @rate, available_capacity: @capacity - @level, utilization_percentage: (@level / @capacity) * 100 } end |
#wait(mutex, deadline = nil, cost = 1) ⇒ Object
Wait for bucket to have capacity.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 64 def wait(mutex, deadline = nil, cost = 1) # Loop until we can acquire or deadline expires: until can_acquire?(cost, Clock.now) # Check deadline before each wait: return false if deadline&.expired? # Calculate how long to wait for bucket to leak enough for this cost: needed_capacity = (@level + cost) - @capacity wait_time = needed_capacity / @rate.to_f # Should be able to acquire now: return true if wait_time <= 0 # Check if wait would exceed deadline: remaining = deadline&.remaining if remaining && wait_time > remaining # Would exceed deadline: return false end # Wait for the required time (or remaining time if deadline specified): actual_wait = remaining ? [wait_time, remaining].min : wait_time # Release mutex during sleep: mutex.sleep(actual_wait) end return true end |