Class: XeroKiwi::Throttle::RedisTokenBucket

Inherits:
Object
  • Object
show all
Defined in:
lib/xero_kiwi/throttle/redis_token_bucket.rb

Overview

Redis-backed token bucket, keyed per tenant. One Ruby instance is shared across threads and (via Redis) across processes, so N Sidekiq workers hitting the same Xero tenant cooperatively share a bucket.

Two buckets per tenant — minute and (optionally) day — modelled as Redis hashes with ‘tokens` and `last_refill_ms` fields. All bucket math runs inside a Lua script so the read-modify-write is atomic server-side; doing it in Ruby with separate GET/SET calls would race and leak tokens.

bucket = XeroKiwi::Throttle::RedisTokenBucket.new(
  redis:      Redis.new,
  per_minute: 55,        # Xero's default is 60; leave headroom.
  per_day:    4_900,     # optional. Xero's default is 5,000.
  max_wait:   30.0       # cap on how long acquire may block.
)

Constant Summary collapse

DEFAULT_NAMESPACE =
"xero_kiwi:throttle"
MINUTE_MS =
60_000
DAY_MS =
86_400_000
POLL_MS =

Extra ms we sleep past a refill-time hint to avoid a busy loop.

1_000
LUA_SCRIPT =

Lua script. Input ARGV: now_ms, capacities… (minute, day?), window_ms… (minute, day?). KEYS: bucket hash keys in the same order as capacities.

Returns: { failed_bucket_index, wait_ms }

{0, 0} = granted everywhere (decrements committed)
{i, N} = bucket i (1-indexed) is empty; no decrements committed; wait N ms for it

Granting is all-or-nothing across buckets: if any bucket is empty we roll back, so a day-limit failure doesn’t burn a minute token.

<<~LUA
  local now_ms = tonumber(ARGV[1])
  local n = #KEYS
  local new_tokens = {}

  for i = 1, n do
    local capacity  = tonumber(ARGV[1 + i])
    local window_ms = tonumber(ARGV[1 + n + i])
    local refill_per_ms = capacity / window_ms

    local data = redis.call("HMGET", KEYS[i], "tokens", "last_refill_ms")
    local tokens         = tonumber(data[1]) or capacity
    local last_refill_ms = tonumber(data[2]) or now_ms

    local elapsed = now_ms - last_refill_ms
    if elapsed < 0 then elapsed = 0 end
    tokens = math.min(capacity, tokens + elapsed * refill_per_ms)

    if tokens < 1 then
      local shortfall = 1 - tokens
      local wait_ms = math.ceil(shortfall / refill_per_ms)
      return { i, wait_ms }
    end

    new_tokens[i] = tokens - 1
  end

  for i = 1, n do
    local window_ms = tonumber(ARGV[1 + n + i])
    redis.call("HSET", KEYS[i], "tokens", new_tokens[i], "last_refill_ms", now_ms)
    redis.call("PEXPIRE", KEYS[i], window_ms * 2)
  end

  return { 0, 0 }
LUA
LUA_SHA =
Digest::SHA1.hexdigest(LUA_SCRIPT)
DEFAULT_CLOCK =
-> { (Process.clock_gettime(Process::CLOCK_REALTIME) * 1000).to_i }
DEFAULT_SLEEPER =
->(seconds) { Kernel.sleep(seconds) }

Instance Method Summary collapse

Constructor Details

#initialize(redis:, per_minute:, per_day: nil, namespace: DEFAULT_NAMESPACE, max_wait: 30.0, logger: nil, clock: DEFAULT_CLOCK, sleeper: DEFAULT_SLEEPER) ⇒ RedisTokenBucket

Returns a new instance of RedisTokenBucket.

Raises:

  • (ArgumentError)


80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/xero_kiwi/throttle/redis_token_bucket.rb', line 80

def initialize(redis:, per_minute:, per_day: nil, namespace: DEFAULT_NAMESPACE,
               max_wait: 30.0, logger: nil, clock: DEFAULT_CLOCK, sleeper: DEFAULT_SLEEPER)
  raise ArgumentError, "per_minute must be > 0" unless per_minute.to_i.positive?
  raise ArgumentError, "per_day must be > 0 when given" if per_day && !per_day.to_i.positive?

  @redis      = redis
  @per_minute = per_minute.to_i
  @per_day    = per_day&.to_i
  @namespace  = namespace
  @max_wait   = max_wait.to_f
  @logger     = logger
  @clock      = clock
  @sleeper    = sleeper
end

Instance Method Details

#acquire(key) ⇒ Object

Blocks until a token is available in every configured bucket. Fails open on Redis errors (logs and returns) so a dead Redis can’t stop the app — the reactive retry layer still catches any resulting 429s.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/xero_kiwi/throttle/redis_token_bucket.rb', line 98

def acquire(key)
  raise ArgumentError, "key is required" if key.nil? || key.to_s.empty?

  waited_ms = 0

  loop do
    failed, wait_ms = evaluate(key)
    return if failed.zero?

    waited_ms = handle_failure(failed, wait_ms, waited_ms)
  end
rescue Redis::BaseError => e
  log_redis_failure(e)
  nil
end