Class: DispatchPolicy::Gates::AdaptiveConcurrency
- Inherits:
-
DispatchPolicy::Gate
- Object
- DispatchPolicy::Gate
- DispatchPolicy::Gates::AdaptiveConcurrency
- Defined in:
- lib/dispatch_policy/gates/adaptive_concurrency.rb
Overview
Self-tuning concurrency gate. Like :concurrency but with a per-partition cap (‘current_max`) that grows when the adapter queue is empty and shrinks when it builds up. AIMD loop persisted in `dispatch_policy_adaptive_concurrency_stats`.
Feedback signal is ‘queue_lag_ms = perform_start - admitted_at` (time the job spent waiting in the adapter after admission). Pure saturation signal — slow performs in the downstream service don’t punish admissions if workers still drain the queue quickly.
Update rule applied after each perform (in InflightTracker.track):
succeeded? & ewma_lag <= target_lag_ms → current_max += 1
succeeded? & ewma_lag > target_lag_ms → current_max *= slow_factor
failed? → current_max *= fail_factor
Always clamped to >= min. Never grows without bound — the algorithm self-limits via target_lag_ms.
Constant Summary collapse
- DEFAULT_FULL_BACKOFF =
seconds
1.0- DEFAULT_EWMA_ALPHA =
weight of the new sample in the EWMA
0.5- DEFAULT_FAIL_FACTOR =
halve on perform raise
0.5- DEFAULT_SLOW_FACTOR =
gentle shrink on overload
0.95
Instance Attribute Summary collapse
-
#ewma_alpha ⇒ Object
readonly
Returns the value of attribute ewma_alpha.
-
#fail_factor ⇒ Object
readonly
Returns the value of attribute fail_factor.
-
#full_backoff ⇒ Object
readonly
Returns the value of attribute full_backoff.
-
#initial_max ⇒ Object
readonly
Returns the value of attribute initial_max.
-
#min ⇒ Object
readonly
Returns the value of attribute min.
-
#slow_factor ⇒ Object
readonly
Returns the value of attribute slow_factor.
-
#target_lag_ms ⇒ Object
readonly
Returns the value of attribute target_lag_ms.
Instance Method Summary collapse
- #evaluate(ctx, partition, admit_budget) ⇒ Object
-
#inflight_partition_key(policy_name, ctx) ⇒ Object
Same canonical scope as the staged_jobs partition_key — every gate in a policy uses ‘policy.partition_for(ctx)` so the inflight count and the adaptive stats line up exactly.
-
#initialize(initial_max:, target_lag_ms:, min: 1, ewma_alpha: DEFAULT_EWMA_ALPHA, failure_decrease_factor: DEFAULT_FAIL_FACTOR, overload_decrease_factor: DEFAULT_SLOW_FACTOR, full_backoff: DEFAULT_FULL_BACKOFF) ⇒ AdaptiveConcurrency
constructor
A new instance of AdaptiveConcurrency.
- #name ⇒ Object
-
#record_observation(policy_name:, partition_key:, queue_lag_ms:, succeeded:) ⇒ Object
Called from InflightTracker.track after each perform completes (success or failure).
Methods inherited from DispatchPolicy::Gate
Constructor Details
#initialize(initial_max:, target_lag_ms:, min: 1, ewma_alpha: DEFAULT_EWMA_ALPHA, failure_decrease_factor: DEFAULT_FAIL_FACTOR, overload_decrease_factor: DEFAULT_SLOW_FACTOR, full_backoff: DEFAULT_FULL_BACKOFF) ⇒ AdaptiveConcurrency
Returns a new instance of AdaptiveConcurrency.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 32 def initialize(initial_max:, target_lag_ms:, min: 1, ewma_alpha: DEFAULT_EWMA_ALPHA, failure_decrease_factor: DEFAULT_FAIL_FACTOR, overload_decrease_factor: DEFAULT_SLOW_FACTOR, full_backoff: DEFAULT_FULL_BACKOFF) super() @initial_max = Integer(initial_max) @target_lag_ms = Float(target_lag_ms) @min = Integer(min) @ewma_alpha = Float(ewma_alpha) @fail_factor = Float(failure_decrease_factor) @slow_factor = Float(overload_decrease_factor) @full_backoff = Float(full_backoff) raise ArgumentError, "target_lag_ms must be > 0" unless @target_lag_ms.positive? raise ArgumentError, "min must be >= 1" unless @min >= 1 raise ArgumentError, "initial_max must be >= min" unless @initial_max >= @min end |
Instance Attribute Details
#ewma_alpha ⇒ Object (readonly)
Returns the value of attribute ewma_alpha.
29 30 31 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29 def ewma_alpha @ewma_alpha end |
#fail_factor ⇒ Object (readonly)
Returns the value of attribute fail_factor.
29 30 31 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29 def fail_factor @fail_factor end |
#full_backoff ⇒ Object (readonly)
Returns the value of attribute full_backoff.
29 30 31 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29 def full_backoff @full_backoff end |
#initial_max ⇒ Object (readonly)
Returns the value of attribute initial_max.
29 30 31 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29 def initial_max @initial_max end |
#min ⇒ Object (readonly)
Returns the value of attribute min.
29 30 31 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29 def min @min end |
#slow_factor ⇒ Object (readonly)
Returns the value of attribute slow_factor.
29 30 31 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29 def slow_factor @slow_factor end |
#target_lag_ms ⇒ Object (readonly)
Returns the value of attribute target_lag_ms.
29 30 31 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29 def target_lag_ms @target_lag_ms end |
Instance Method Details
#evaluate(ctx, partition, admit_budget) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 54 def evaluate(ctx, partition, admit_budget) policy_name = partition["policy_name"] key = inflight_partition_key(policy_name, ctx) # Seed lazily so the very first admission has a row to read # (and so record_observation can UPDATE without a check). Repository.adaptive_seed!( policy_name: policy_name, partition_key: key, initial_max: @initial_max ) cap = Repository.adaptive_current_max( policy_name: policy_name, partition_key: key ) || @initial_max cap = [cap, @min].max in_flight = Repository.count_inflight( policy_name: policy_name, partition_key: key ) remaining = cap - in_flight # Safety valve. AIMD can shrink current_max during a slow burst; # if the partition then idles, no observations come in to grow # the cap back. When in_flight == 0 we ensure at least # initial_max so the partition never fossilizes at min. remaining = [remaining, @initial_max].max if in_flight.zero? if remaining <= 0 return Decision.new(allowed: 0, retry_after: @full_backoff, reason: "adaptive_concurrency_full") end Decision.new(allowed: [remaining, admit_budget].min) end |
#inflight_partition_key(policy_name, ctx) ⇒ Object
Same canonical scope as the staged_jobs partition_key — every gate in a policy uses ‘policy.partition_for(ctx)` so the inflight count and the adaptive stats line up exactly.
96 97 98 99 100 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 96 def inflight_partition_key(policy_name, ctx) policy = DispatchPolicy.registry.fetch(policy_name) raise InvalidPolicy, "unknown policy #{policy_name.inspect}" unless policy policy.partition_for(ctx) end |
#name ⇒ Object
50 51 52 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 50 def name :adaptive_concurrency end |
#record_observation(policy_name:, partition_key:, queue_lag_ms:, succeeded:) ⇒ Object
Called from InflightTracker.track after each perform completes (success or failure). Updates the AIMD state atomically in one SQL statement.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 105 def record_observation(policy_name:, partition_key:, queue_lag_ms:, succeeded:) Repository.adaptive_seed!( policy_name: policy_name, partition_key: partition_key.to_s, initial_max: @initial_max ) Repository.adaptive_record!( policy_name: policy_name, partition_key: partition_key.to_s, queue_lag_ms: queue_lag_ms, succeeded: succeeded, alpha: @ewma_alpha, target_lag_ms: @target_lag_ms, fail_factor: @fail_factor, slow_factor: @slow_factor, min: @min ) end |