Class: DispatchPolicy::Gates::Throttle
- Inherits:
-
DispatchPolicy::Gate
- Object
- DispatchPolicy::Gate
- DispatchPolicy::Gates::Throttle
- Defined in:
- lib/dispatch_policy/gates/throttle.rb
Overview
Token bucket throttle gate.
Persists state in partitions.gate_state =
"tokens" => Float, # current tokens, capped at bucket size
"refilled_at" => Float # epoch seconds, last refill
The partition scope this gate enforces against is the policy’s ‘partition_by` (declared in the policy DSL block, not on the gate). The bucket lives on the staged partition row — one row per `policy.partition_for(ctx)` value, one bucket per row, no dilution.
Instance Attribute Summary collapse
-
#per ⇒ Object
readonly
Returns the value of attribute per.
-
#rate_proc ⇒ Object
readonly
Returns the value of attribute rate_proc.
Instance Method Summary collapse
- #evaluate(ctx, partition, admit_budget) ⇒ Object
-
#initialize(rate:, per:) ⇒ Throttle
constructor
A new instance of Throttle.
- #name ⇒ Object
Methods inherited from DispatchPolicy::Gate
Constructor Details
#initialize(rate:, per:) ⇒ Throttle
Returns a new instance of Throttle.
19 20 21 22 23 24 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 19 def initialize(rate:, per:) super() @rate_proc = rate.respond_to?(:call) ? rate : ->(_ctx) { rate } @per = duration_seconds(per) raise ArgumentError, "throttle :per must be > 0 (got #{@per})" unless @per.positive? end |
Instance Attribute Details
#per ⇒ Object (readonly)
Returns the value of attribute per.
17 18 19 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 17 def per @per end |
#rate_proc ⇒ Object (readonly)
Returns the value of attribute rate_proc.
17 18 19 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 17 def rate_proc @rate_proc end |
Instance Method Details
#evaluate(ctx, partition, admit_budget) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 30 def evaluate(ctx, partition, admit_budget) capacity = capacity_for(ctx) return Decision.deny(reason: "rate=0") if capacity <= 0 refill_rate = capacity.to_f / @per state = (partition["gate_state"] || {})["throttle"] || {} tokens = (state["tokens"] || capacity).to_f refilled_at = (state["refilled_at"] || now).to_f elapsed = [now - refilled_at, 0.0].max tokens = [tokens + (elapsed * refill_rate), capacity.to_f].min whole = tokens.floor if whole.zero? missing = 1.0 - tokens retry_after = missing / refill_rate patch = { "tokens" => tokens, "refilled_at" => now } return Decision.new(allowed: 0, retry_after: retry_after, gate_state_patch: { "throttle" => patch }, reason: "throttle_empty") end allowed = [whole, admit_budget].min patch = { "tokens" => tokens - allowed, "refilled_at" => now } Decision.new(allowed: allowed, gate_state_patch: { "throttle" => patch }) end |
#name ⇒ Object
26 27 28 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 26 def name :throttle end |