Class: DispatchPolicy::Gates::Throttle
- Inherits:
-
DispatchPolicy::Gate
- Object
- DispatchPolicy::Gate
- DispatchPolicy::Gates::Throttle
- Defined in:
- lib/dispatch_policy/gates/throttle.rb
Overview
Token bucket throttle gate.
Persists state in partitions.gate_state =
"tokens" => Float, # current tokens, capped at bucket size
"refilled_at" => Float # epoch seconds, last refill
The partition scope this gate enforces against is the policy’s ‘partition_by` (declared in the policy DSL block, not on the gate). The bucket lives on the staged partition row — one row per `policy.partition_for(ctx)` value, one bucket per row, no dilution.
Instance Attribute Summary collapse
-
#per ⇒ Object
readonly
Returns the value of attribute per.
-
#rate_proc ⇒ Object
readonly
Returns the value of attribute rate_proc.
Instance Method Summary collapse
-
#consume(decision, admitted_count) ⇒ Object
Settles the bucket against the number of jobs actually admitted.
- #evaluate(ctx, partition, admit_budget) ⇒ Object
-
#initialize(rate:, per:) ⇒ Throttle
constructor
A new instance of Throttle.
- #name ⇒ Object
Constructor Details
#initialize(rate:, per:) ⇒ Throttle
Returns a new instance of Throttle.
19 20 21 22 23 24 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 19 def initialize(rate:, per:) super() @rate_proc = rate.respond_to?(:call) ? rate : ->(_ctx) { rate } @per = duration_seconds(per) raise ArgumentError, "throttle :per must be > 0 (got #{@per})" unless @per.positive? end |
Instance Attribute Details
#per ⇒ Object (readonly)
Returns the value of attribute per.
17 18 19 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 17 def per @per end |
#rate_proc ⇒ Object (readonly)
Returns the value of attribute rate_proc.
17 18 19 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 17 def rate_proc @rate_proc end |
Instance Method Details
#consume(decision, admitted_count) ⇒ Object
Settles the bucket against the number of jobs actually admitted. ‘evaluate` recorded the post-refill token count in the decision’s patch; here we subtract exactly ‘admitted_count` (≤ allowed), so the bucket is charged for jobs that really left, never for unspent budget. Called by Pipeline.settle after the claim.
70 71 72 73 74 75 76 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 70 def consume(decision, admitted_count) st = decision.gate_state_patch && decision.gate_state_patch["throttle"] return nil unless st { "throttle" => { "tokens" => st["tokens"].to_f - admitted_count, "refilled_at" => st["refilled_at"] } } end |
#evaluate(ctx, partition, admit_budget) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 30 def evaluate(ctx, partition, admit_budget) capacity = capacity_for(ctx) return Decision.deny(reason: "rate=0") if capacity <= 0 refill_rate = capacity.to_f / @per state = (partition["gate_state"] || {})["throttle"] || {} tokens = (state["tokens"] || capacity).to_f refilled_at = (state["refilled_at"] || now).to_f elapsed = [now - refilled_at, 0.0].max tokens = [tokens + (elapsed * refill_rate), capacity.to_f].min # The patch records the post-refill bucket WITHOUT deducting yet. # The actual deduction is deferred to #consume, which runs once # the admission TX knows how many staged rows were really claimed. # Deducting `allowed` here over-charges the bucket whenever fewer # jobs are admitted than allowed — a later gate capping admit_count, # future-scheduled rows skipped by the `scheduled_at <= now()` # filter, or rows another tick grabbed under SKIP LOCKED. patch = { "tokens" => tokens, "refilled_at" => now } whole = tokens.floor if whole.zero? missing = 1.0 - tokens retry_after = missing / refill_rate return Decision.new(allowed: 0, retry_after: retry_after, gate_state_patch: { "throttle" => patch }, reason: "throttle_empty") end allowed = [whole, admit_budget].min Decision.new(allowed: allowed, gate_state_patch: { "throttle" => patch }) end |
#name ⇒ Object
26 27 28 |
# File 'lib/dispatch_policy/gates/throttle.rb', line 26 def name :throttle end |