Class: Async::Limiter::Timing::LeakyBucket

Inherits:
Object
  • Object
show all
Defined in:
lib/async/limiter/timing/leaky_bucket.rb

Overview

Leaky bucket timing strategy that smooths traffic flow.

This strategy maintains a “bucket” that gradually “leaks” capacity over time, enforcing a steady output rate regardless of input bursts.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(rate, capacity, initial_level: 0) ⇒ LeakyBucket

Initialize a leaky bucket timing strategy.

Raises:

  • (ArgumentError)


27
28
29
30
31
32
33
34
35
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 27

def initialize(rate, capacity, initial_level: 0)
	raise ArgumentError, "rate must be non-negative" unless rate >= 0
	raise ArgumentError, "capacity must be positive" unless capacity.positive?
	
	@rate = rate
	@capacity = capacity
	@level = initial_level.to_f
	@last_leak = Clock.now
end

Instance Attribute Details

#capacityObject (readonly)

Returns the value of attribute capacity.



21
22
23
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 21

def capacity
  @capacity
end

#Leak rate in units per second.(rate) ⇒ Object (readonly)



18
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 18

attr_reader :rate

#Maximum bucket capacity.(bucketcapacity.) ⇒ Object (readonly)



21
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 21

attr_reader :capacity

#rateObject (readonly)

Returns the value of attribute rate.



18
19
20
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 18

def rate
  @rate
end

Instance Method Details

#acquire(cost = 1) ⇒ Object

Record that a task was acquired (adds cost to bucket level).



54
55
56
57
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 54

def acquire(cost = 1)
	leak_bucket
	@level += cost
end

#advance_time(seconds) ⇒ Object

Simulate time advancement for testing purposes.



110
111
112
113
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 110

def advance_time(seconds)
	@last_leak -= seconds
	leak_bucket
end

#can_acquire?(cost = 1, current_time = Clock.now) ⇒ Boolean

Check if a task can be acquired with the given cost.

Returns:

  • (Boolean)


47
48
49
50
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 47

def can_acquire?(cost = 1, current_time = Clock.now)
	leak_bucket(current_time)
	@level + cost <= @capacity
end

#levelObject

Get current bucket level (for debugging/monitoring).



96
97
98
99
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 96

def level
	leak_bucket
	@level
end

#level=(new_level) ⇒ Object

Set bucket level (for testing purposes).



103
104
105
106
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 103

def level=(new_level)
	@level = new_level.to_f
	@last_leak = Clock.now
end

#maximum_costObject

Maximum cost this timing strategy can support.



39
40
41
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 39

def maximum_cost
	@capacity
end

#statisticsObject

Get current timing strategy statistics.



117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 117

def statistics
	leak_bucket
	
	{
		name: "LeakyBucket",
		current_level: @level,
		maximum_capacity: @capacity,
		leak_rate: @rate,
		available_capacity: @capacity - @level,
		utilization_percentage: (@level / @capacity) * 100
	}
end

#wait(mutex, deadline = nil, cost = 1) ⇒ Object

Wait for bucket to have capacity.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/async/limiter/timing/leaky_bucket.rb', line 64

def wait(mutex, deadline = nil, cost = 1)
	# Loop until we can acquire or deadline expires:
	until can_acquire?(cost, Clock.now)
		# Check deadline before each wait:
		return false if deadline&.expired?
		
		# Calculate how long to wait for bucket to leak enough for this cost:
		needed_capacity = (@level + cost) - @capacity
		wait_time = needed_capacity / @rate.to_f
		
		# Should be able to acquire now:
		return true if wait_time <= 0
		
		# Check if wait would exceed deadline:
		remaining = deadline&.remaining
		if remaining && wait_time > remaining
			# Would exceed deadline:
			return false
		end
		
		# Wait for the required time (or remaining time if deadline specified):
		actual_wait = remaining ? [wait_time, remaining].min : wait_time
		
		# Release mutex during sleep:
		mutex.sleep(actual_wait)
	end
	
	return true
end