Class: DispatchPolicy::Gates::AdaptiveConcurrency
- Inherits:
-
DispatchPolicy::Gate
- Object
- DispatchPolicy::Gate
- DispatchPolicy::Gates::AdaptiveConcurrency
- Defined in:
- lib/dispatch_policy/gates/adaptive_concurrency.rb
Overview
Adaptive variant of :concurrency. The cap per partition (current_max) shrinks when the adapter queue backs up (recent queue_lag > target) or when performs fail; grows back when workers drain admissions quickly (queue_lag near zero). The signal is pure queue wait — admitted_at →perform_start — so it reflects “are we admitting too fast?” without getting polluted by how long the external work takes.
AIMD loop on a per-partition stats row; the underlying in-flight counter is the same PartitionInflightCount used by :concurrency.
Constant Summary collapse
- DEFAULT_EWMA_ALPHA =
alpha is fast enough that a single spike is forgotten in ~3 observations instead of ~15. slow_factor 0.95 halves the per- observation shrink magnitude so the cap no longer overshoots after a burst drains the adapter queue.
0.5- DEFAULT_FAIL_FACTOR =
0.5- DEFAULT_SLOW_FACTOR =
0.95
Instance Attribute Summary collapse
-
#ewma_alpha ⇒ Object
readonly
Returns the value of attribute ewma_alpha.
-
#fail_factor ⇒ Object
readonly
Returns the value of attribute fail_factor.
-
#initial_max ⇒ Object
readonly
Returns the value of attribute initial_max.
-
#min ⇒ Object
readonly
Returns the value of attribute min.
-
#slow_factor ⇒ Object
readonly
Returns the value of attribute slow_factor.
-
#target_lag_ms ⇒ Object
readonly
Returns the value of attribute target_lag_ms.
Attributes inherited from DispatchPolicy::Gate
Instance Method Summary collapse
-
#configure(initial_max:, target_lag_ms: nil, target_latency: nil, min: 1, ewma_alpha: DEFAULT_EWMA_ALPHA, failure_decrease_factor: DEFAULT_FAIL_FACTOR, overload_decrease_factor: DEFAULT_SLOW_FACTOR) ⇒ Object
target_lag_ms accepts the legacy alias ‘target_latency` for backwards compatibility.
- #filter(batch, context) ⇒ Object
-
#record_observation(partition_key:, queue_lag_ms:, succeeded:) ⇒ Object
Called by Dispatchable#around_perform for each adaptive gate that touched this job.
- #tracks_inflight? ⇒ Boolean
Methods inherited from DispatchPolicy::Gate
#initialize, #partition_key_for, register, registry
Constructor Details
This class inherits a constructor from DispatchPolicy::Gate
Instance Attribute Details
#ewma_alpha ⇒ Object (readonly)
Returns the value of attribute ewma_alpha.
45 46 47 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45 def ewma_alpha @ewma_alpha end |
#fail_factor ⇒ Object (readonly)
Returns the value of attribute fail_factor.
45 46 47 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45 def fail_factor @fail_factor end |
#initial_max ⇒ Object (readonly)
Returns the value of attribute initial_max.
45 46 47 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45 def initial_max @initial_max end |
#min ⇒ Object (readonly)
Returns the value of attribute min.
45 46 47 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45 def min @min end |
#slow_factor ⇒ Object (readonly)
Returns the value of attribute slow_factor.
45 46 47 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45 def slow_factor @slow_factor end |
#target_lag_ms ⇒ Object (readonly)
Returns the value of attribute target_lag_ms.
45 46 47 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45 def target_lag_ms @target_lag_ms end |
Instance Method Details
#configure(initial_max:, target_lag_ms: nil, target_latency: nil, min: 1, ewma_alpha: DEFAULT_EWMA_ALPHA, failure_decrease_factor: DEFAULT_FAIL_FACTOR, overload_decrease_factor: DEFAULT_SLOW_FACTOR) ⇒ Object
target_lag_ms accepts the legacy alias ‘target_latency` for backwards compatibility.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 25 def configure(initial_max:, target_lag_ms: nil, target_latency: nil, min: 1, ewma_alpha: DEFAULT_EWMA_ALPHA, failure_decrease_factor: DEFAULT_FAIL_FACTOR, overload_decrease_factor: DEFAULT_SLOW_FACTOR) @initial_max = initial_max @min = min @target_lag_ms = target_lag_ms || target_latency @ewma_alpha = ewma_alpha @fail_factor = failure_decrease_factor @slow_factor = overload_decrease_factor raise ArgumentError, "adaptive_concurrency requires target_lag_ms" if @target_lag_ms.nil? end |
#filter(batch, context) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 48 def filter(batch, context) by_partition = batch.group_by { |staged| partition_key_for(context.for(staged)) } # Seed any missing stats rows so the first admission has something # to read. Cheap: one INSERT ... ON CONFLICT DO NOTHING per key. by_partition.each_key do |key| AdaptiveConcurrencyStats.seed!( policy_name: policy.name, gate_name: name, partition_key: key, initial_max: resolve(@initial_max, nil).to_i ) end stats = AdaptiveConcurrencyStats.fetch_many( policy_name: policy.name, gate_name: name, partition_keys: by_partition.keys ) in_flight = PartitionInflightCount.fetch_many( policy_name: policy.name, gate_name: name, partition_keys: by_partition.keys ) min_v = resolve(@min, nil).to_i admitted = [] by_partition.each do |partition_key, jobs| effective_max = stats.dig(partition_key, :current_max) || resolve(@initial_max, nil).to_i effective_max = [ effective_max, min_v ].max used = in_flight.fetch(partition_key, 0) # Safety valve: if nothing is in-flight for this partition and # there's pending, the adapter queue is (or is about to be) # empty and workers will idle. Ensure we hand over at least # initial_max so the stream never dries up on its own. if used.zero? && jobs.any? effective_max = [ effective_max, resolve(@initial_max, nil).to_i ].max end jobs.each do |staged| break unless used < effective_max admitted << [ staged, partition_key ] used += 1 end end context.record_partitions(admitted, gate: name) admitted.map(&:first) end |
#record_observation(partition_key:, queue_lag_ms:, succeeded:) ⇒ Object
Called by Dispatchable#around_perform for each adaptive gate that touched this job. Lives on the gate instance because configuration (alpha, target_latency, etc.) is per gate.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 104 def record_observation(partition_key:, queue_lag_ms:, succeeded:) AdaptiveConcurrencyStats.record_observation!( policy_name: policy.name, gate_name: name, partition_key: partition_key.to_s, queue_lag_ms: queue_lag_ms, succeeded: succeeded, alpha: @ewma_alpha, min: resolve(@min, nil).to_i, target_lag_ms: resolve(@target_lag_ms, nil).to_f, fail_factor: @fail_factor, slow_factor: @slow_factor, initial_max: resolve(@initial_max, nil).to_i ) end |
#tracks_inflight? ⇒ Boolean
41 42 43 |
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 41 def tracks_inflight? true end |