Class: DispatchPolicy::Gates::Throttle
- Inherits:
-
DispatchPolicy::Gate
- Object
- DispatchPolicy::Gate
- DispatchPolicy::Gates::Throttle
- Defined in:
- lib/dispatch_policy/gates/throttle.rb
Overview
Token bucket throttle gate.
Persists state in partitions.gate_state =
"tokens" => Float, # current tokens, capped at bucket size
"refilled_at" => Float # epoch seconds, last refill
The partition scope this gate enforces against is the policy’s ‘partition_by` (declared in the policy DSL block, not on the gate). The bucket lives on the staged partition row — one row per `policy.partition_for(ctx)` value, one bucket per row, no dilution.
Instance Attribute Summary collapse
-
#per_proc ⇒ Object
readonly
Returns the value of attribute per_proc.
-
#rate_proc ⇒ Object
readonly
Returns the value of attribute rate_proc.
Instance Method Summary collapse
-
#consume(decision, admitted_count) ⇒ Object
Settles the bucket against the number of jobs actually admitted.
- #evaluate(ctx, partition, admit_budget) ⇒ Object
-
#initialize(rate:, per:) ⇒ Throttle
constructor
A new instance of Throttle.
- #name ⇒ Object
Constructor Details
#initialize(rate:, per:) ⇒ Throttle
Returns a new instance of Throttle.
19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 19 def initialize(rate:, per:) super() @rate_proc = rate.respond_to?(:call) ? rate : ->(_ctx) { rate } if per.respond_to?(:call) # Dynamic window (per-ctx), symmetric with a dynamic rate. Validated # per-evaluate since the value isn't known until admission time. @per_proc = ->(ctx) { duration_seconds(per.call(ctx)) } else fixed = duration_seconds(per) raise ArgumentError, "throttle :per must be > 0 (got #{fixed})" unless fixed.positive? @per_proc = ->(_ctx) { fixed } end end |
Instance Attribute Details
#per_proc ⇒ Object (readonly)
Returns the value of attribute per_proc.
17 18 19 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 17 def per_proc @per_proc end |
#rate_proc ⇒ Object (readonly)
Returns the value of attribute rate_proc.
17 18 19 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 17 def rate_proc @rate_proc end |
Instance Method Details
#consume(decision, admitted_count) ⇒ Object
Settles the bucket against the number of jobs actually admitted. ‘evaluate` recorded the post-refill token count in the decision’s patch; here we subtract exactly ‘admitted_count` (≤ allowed), so the bucket is charged for jobs that really left, never for unspent budget. Called by Pipeline.settle after the claim.
88 89 90 91 92 93 94 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 88 def consume(decision, admitted_count) st = decision.gate_state_patch && decision.gate_state_patch["throttle"] return nil unless st { "throttle" => { "tokens" => st["tokens"].to_f - admitted_count, "refilled_at" => st["refilled_at"] } } end |
#evaluate(ctx, partition, admit_budget) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 37 def evaluate(ctx, partition, admit_budget) per = per_for(ctx) rate = rate_for(ctx) # rate <= 0 (e.g. a paused tenant) backs off for one window instead # of denying with a NULL retry_after. A NULL retry_after leaves the # partition immediately eligible, so it would be re-claimed and # re-evaluated every single tick — a busy-loop that also clobbers any # backoff a prior tick had set. return Decision.deny(retry_after: per, reason: "rate=0") if rate <= 0 # The bucket holds at least one whole token; otherwise a sub-unit rate # (e.g. rate: 0.5) could never accumulate a full token and would never # admit. refill_rate stays at the true `rate` so the long-run pace is # exact — the floor only sets the burst ceiling. capacity = [rate, 1.0].max refill_rate = rate / per state = (partition["gate_state"] || {})["throttle"] || {} tokens = (state["tokens"] || capacity).to_f refilled_at = (state["refilled_at"] || now).to_f elapsed = [now - refilled_at, 0.0].max tokens = [tokens + (elapsed * refill_rate), capacity].min # The patch records the post-refill bucket WITHOUT deducting yet. # The actual deduction is deferred to #consume, which runs once # the admission TX knows how many staged rows were really claimed. # Deducting `allowed` here over-charges the bucket whenever fewer # jobs are admitted than allowed — a later gate capping admit_count, # future-scheduled rows skipped by the `scheduled_at <= now()` # filter, or rows another tick grabbed under SKIP LOCKED. patch = { "tokens" => tokens, "refilled_at" => now } whole = tokens.floor if whole.zero? missing = 1.0 - tokens retry_after = missing / refill_rate return Decision.new(allowed: 0, retry_after: retry_after, gate_state_patch: { "throttle" => patch }, reason: "throttle_empty") end allowed = [whole, admit_budget].min Decision.new(allowed: allowed, gate_state_patch: { "throttle" => patch }) end |
#name ⇒ Object
33 34 35 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 33 def name :throttle end |