Class: DispatchPolicy::Gates::Throttle

Inherits:
DispatchPolicy::Gate show all
Defined in:
lib/dispatch_policy/gates/throttle.rb

Overview

Token bucket throttle gate.

Persists state in partitions.gate_state =

"tokens"      => Float,   # current tokens, capped at bucket size
"refilled_at" => Float    # epoch seconds, last refill

The partition scope this gate enforces against is the policy’s ‘partition_by` (declared in the policy DSL block, not on the gate). The bucket lives on the staged partition row — one row per `policy.partition_for(ctx)` value, one bucket per row, no dilution.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from DispatchPolicy::Gate

#consume

Constructor Details

#initialize(rate:, per:) ⇒ Throttle

Returns a new instance of Throttle.

Raises:

  • (ArgumentError)


19
20
21
22
23
24
# File 'lib/dispatch_policy/gates/throttle.rb', line 19

def initialize(rate:, per:)
  super()
  @rate_proc = rate.respond_to?(:call) ? rate : ->(_ctx) { rate }
  @per       = duration_seconds(per)
  raise ArgumentError, "throttle :per must be > 0 (got #{@per})" unless @per.positive?
end

Instance Attribute Details

#perObject (readonly)

Returns the value of attribute per.



17
18
19
# File 'lib/dispatch_policy/gates/throttle.rb', line 17

def per
  @per
end

#rate_procObject (readonly)

Returns the value of attribute rate_proc.



17
18
19
# File 'lib/dispatch_policy/gates/throttle.rb', line 17

def rate_proc
  @rate_proc
end

Instance Method Details

#evaluate(ctx, partition, admit_budget) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/dispatch_policy/gates/throttle.rb', line 30

def evaluate(ctx, partition, admit_budget)
  capacity = capacity_for(ctx)
  return Decision.deny(reason: "rate=0") if capacity <= 0

  refill_rate = capacity.to_f / @per
  state       = (partition["gate_state"] || {})["throttle"] || {}
  tokens      = (state["tokens"] || capacity).to_f
  refilled_at = (state["refilled_at"] || now).to_f

  elapsed     = [now - refilled_at, 0.0].max
  tokens      = [tokens + (elapsed * refill_rate), capacity.to_f].min

  whole       = tokens.floor
  if whole.zero?
    missing      = 1.0 - tokens
    retry_after  = missing / refill_rate
    patch        = { "tokens" => tokens, "refilled_at" => now }
    return Decision.new(allowed: 0,
                        retry_after: retry_after,
                        gate_state_patch: { "throttle" => patch },
                        reason: "throttle_empty")
  end

  allowed = [whole, admit_budget].min
  patch   = { "tokens" => tokens - allowed, "refilled_at" => now }
  Decision.new(allowed: allowed, gate_state_patch: { "throttle" => patch })
end

#nameObject



26
27
28
# File 'lib/dispatch_policy/gates/throttle.rb', line 26

def name
  :throttle
end