Class: Philiprehberger::TokenBucket::Bucket

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/token_bucket.rb

Overview

A thread-safe token bucket rate limiter

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capacity:, refill_rate:, strategy: :smooth) ⇒ Bucket

Returns a new instance of Bucket.

Parameters:

  • capacity (Numeric)

    maximum number of tokens

  • refill_rate (Numeric)

    tokens added per second

  • strategy (Symbol) (defaults to: :smooth)

    :smooth (continuous) or :interval (burst refill)

Raises:



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/philiprehberger/token_bucket.rb', line 18

def initialize(capacity:, refill_rate:, strategy: :smooth)
  raise Error, 'capacity must be positive' unless capacity.positive?
  raise Error, 'refill_rate must be positive' unless refill_rate.positive?
  raise Error, "unknown strategy: #{strategy}" unless STRATEGIES.include?(strategy)

  @capacity = capacity.to_f
  @refill_rate = refill_rate.to_f
  @strategy = strategy
  @tokens = @capacity
  @last_refill = now
  @refill_interval = @capacity / @refill_rate
  @mutex = Mutex.new
end

Instance Attribute Details

#capacityObject (readonly)

Returns the value of attribute capacity.



13
14
15
# File 'lib/philiprehberger/token_bucket.rb', line 13

def capacity
  @capacity
end

#refill_rateObject (readonly)

Returns the value of attribute refill_rate.



13
14
15
# File 'lib/philiprehberger/token_bucket.rb', line 13

def refill_rate
  @refill_rate
end

#strategyObject (readonly)

Returns the value of attribute strategy.



13
14
15
# File 'lib/philiprehberger/token_bucket.rb', line 13

def strategy
  @strategy
end

Instance Method Details

#availableFloat

Return the number of currently available tokens.

Returns:

  • (Float)

    available tokens



107
108
109
110
111
112
# File 'lib/philiprehberger/token_bucket.rb', line 107

def available
  @mutex.synchronize do
    refill
    @tokens
  end
end

#drainself

Drain all tokens from the bucket.

Returns:

  • (self)


128
129
130
131
132
133
# File 'lib/philiprehberger/token_bucket.rb', line 128

def drain
  @mutex.synchronize do
    @tokens = 0.0
  end
  self
end

#full?Boolean

Check whether the bucket is at full capacity.

Returns:

  • (Boolean)


149
150
151
152
153
154
# File 'lib/philiprehberger/token_bucket.rb', line 149

def full?
  @mutex.synchronize do
    refill
    @tokens >= @capacity
  end
end

#resetself

Reset the bucket to full capacity.

Returns:

  • (self)


138
139
140
141
142
143
144
# File 'lib/philiprehberger/token_bucket.rb', line 138

def reset
  @mutex.synchronize do
    @tokens = @capacity
    @last_refill = now
  end
  self
end

#statsHash{Symbol => Float, Symbol}

Return a frozen snapshot of the bucket’s current state.

The snapshot reflects state at call time: refill is invoked before reading available so refill-since-last-access is accounted for.

Returns:

  • (Hash{Symbol => Float, Symbol})

    frozen hash with keys :available, :capacity, :refill_rate, :strategy



163
164
165
166
167
168
169
170
171
172
173
# File 'lib/philiprehberger/token_bucket.rb', line 163

def stats
  @mutex.synchronize do
    refill
    {
      available: @tokens,
      capacity: @capacity,
      refill_rate: @refill_rate,
      strategy: @strategy
    }.freeze
  end
end

#take(n = 1) ⇒ void

This method returns an undefined value.

Take n tokens, blocking until they are available.

Parameters:

  • n (Numeric) (defaults to: 1)

    number of tokens to take

Raises:

  • (Error)

    if n exceeds capacity



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/philiprehberger/token_bucket.rb', line 37

def take(n = 1)
  raise Error, "cannot take #{n} tokens from bucket with capacity #{@capacity}" if n > @capacity

  loop do
    wait = nil
    @mutex.synchronize do
      refill
      if @tokens >= n
        @tokens -= n
        return
      end
      wait = compute_wait_time(n)
    end
    sleep(wait)
  end
end

#take_wait_timeout(n = 1, timeout:) ⇒ Boolean

Take n tokens, blocking up to timeout seconds waiting for them.

Releases the internal mutex while sleeping so other threads may take or refill in the meantime. Uses a monotonic clock to track the deadline, and re-checks availability after each sleep.

Parameters:

  • n (Numeric) (defaults to: 1)

    number of tokens to take

  • timeout (Float)

    maximum seconds to wait

Returns:

  • (Boolean)

    true when tokens were acquired

Raises:

  • (Error)

    if n exceeds capacity, or if the timeout elapses before tokens are available



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/philiprehberger/token_bucket.rb', line 80

def take_wait_timeout(n = 1, timeout:)
  raise Error, "cannot take #{n} tokens from bucket with capacity #{@capacity}" if n > @capacity

  deadline = now + timeout.to_f

  loop do
    wait = nil
    @mutex.synchronize do
      refill
      if @tokens >= n
        @tokens -= n
        return true
      end
      wait = compute_wait_time(n)
    end

    remaining = deadline - now
    raise Error, "timed out waiting for #{n} tokens after #{timeout}s" if remaining <= 0

    sleep_for = [wait, remaining].min
    sleep(sleep_for) if sleep_for.positive?
  end
end

#try_take(n = 1) ⇒ Boolean

Try to take n tokens without blocking.

Parameters:

  • n (Numeric) (defaults to: 1)

    number of tokens to take

Returns:

  • (Boolean)

    true if tokens were taken, false otherwise



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/philiprehberger/token_bucket.rb', line 58

def try_take(n = 1)
  @mutex.synchronize do
    refill
    if @tokens >= n
      @tokens -= n
      true
    else
      false
    end
  end
end

#wait_time(n = 1) ⇒ Float

Calculate how long to wait for n tokens to become available.

Parameters:

  • n (Numeric) (defaults to: 1)

    number of tokens needed

Returns:

  • (Float)

    seconds to wait (0.0 if tokens are already available)



118
119
120
121
122
123
# File 'lib/philiprehberger/token_bucket.rb', line 118

def wait_time(n = 1)
  @mutex.synchronize do
    refill
    compute_wait_time(n)
  end
end