Class: DispatchPolicy::Gates::AdaptiveConcurrency

Inherits:
DispatchPolicy::Gate show all
Defined in:
lib/dispatch_policy/gates/adaptive_concurrency.rb

Overview

Self-tuning concurrency gate. Like :concurrency but with a per-partition cap (‘current_max`) that grows when the adapter queue is empty and shrinks when it builds up. AIMD loop persisted in `dispatch_policy_adaptive_concurrency_stats`.

Feedback signal is ‘queue_lag_ms = perform_start - admitted_at` (time the job spent waiting in the adapter after admission). Pure saturation signal — slow performs in the downstream service don’t punish admissions if workers still drain the queue quickly.

Update rule applied after each perform (in InflightTracker.track):

succeeded? & ewma_lag <= target_lag_ms → current_max += 1
succeeded? & ewma_lag >  target_lag_ms → current_max *= slow_factor
failed?                                → current_max *= fail_factor

Always clamped to >= min. Never grows without bound — the algorithm self-limits via target_lag_ms.

Constant Summary collapse

DEFAULT_FULL_BACKOFF =

seconds

1.0
DEFAULT_EWMA_ALPHA =

weight of the new sample in the EWMA

0.5
DEFAULT_FAIL_FACTOR =

halve on perform raise

0.5
DEFAULT_SLOW_FACTOR =

gentle shrink on overload

0.95

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from DispatchPolicy::Gate

#consume

Constructor Details

#initialize(initial_max:, target_lag_ms:, min: 1, ewma_alpha: DEFAULT_EWMA_ALPHA, failure_decrease_factor: DEFAULT_FAIL_FACTOR, overload_decrease_factor: DEFAULT_SLOW_FACTOR, full_backoff: DEFAULT_FULL_BACKOFF) ⇒ AdaptiveConcurrency

Returns a new instance of AdaptiveConcurrency.

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 32

def initialize(initial_max:, target_lag_ms:, min: 1,
               ewma_alpha: DEFAULT_EWMA_ALPHA,
               failure_decrease_factor: DEFAULT_FAIL_FACTOR,
               overload_decrease_factor: DEFAULT_SLOW_FACTOR,
               full_backoff: DEFAULT_FULL_BACKOFF)
  super()
  @initial_max   = Integer(initial_max)
  @target_lag_ms = Float(target_lag_ms)
  @min           = Integer(min)
  @ewma_alpha    = Float(ewma_alpha)
  @fail_factor   = Float(failure_decrease_factor)
  @slow_factor   = Float(overload_decrease_factor)
  @full_backoff  = Float(full_backoff)
  raise ArgumentError, "target_lag_ms must be > 0" unless @target_lag_ms.positive?
  raise ArgumentError, "min must be >= 1"          unless @min >= 1
  raise ArgumentError, "initial_max must be >= min" unless @initial_max >= @min
end

Instance Attribute Details

#ewma_alphaObject (readonly)

Returns the value of attribute ewma_alpha.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def ewma_alpha
  @ewma_alpha
end

#fail_factorObject (readonly)

Returns the value of attribute fail_factor.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def fail_factor
  @fail_factor
end

#full_backoffObject (readonly)

Returns the value of attribute full_backoff.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def full_backoff
  @full_backoff
end

#initial_maxObject (readonly)

Returns the value of attribute initial_max.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def initial_max
  @initial_max
end

#minObject (readonly)

Returns the value of attribute min.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def min
  @min
end

#slow_factorObject (readonly)

Returns the value of attribute slow_factor.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def slow_factor
  @slow_factor
end

#target_lag_msObject (readonly)

Returns the value of attribute target_lag_ms.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def target_lag_ms
  @target_lag_ms
end

Instance Method Details

#evaluate(ctx, partition, admit_budget) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 54

def evaluate(ctx, partition, admit_budget)
  policy_name = partition["policy_name"]
  key         = inflight_partition_key(policy_name, ctx)

  # Seed lazily so the very first admission has a row to read
  # (and so record_observation can UPDATE without a check).
  Repository.adaptive_seed!(
    policy_name:   policy_name,
    partition_key: key,
    initial_max:   @initial_max
  )

  cap = Repository.adaptive_current_max(
    policy_name:   policy_name,
    partition_key: key
  ) || @initial_max
  cap = [cap, @min].max

  in_flight = Repository.count_inflight(
    policy_name:   policy_name,
    partition_key: key
  )
  remaining = cap - in_flight

  # Safety valve. AIMD can shrink current_max during a slow burst;
  # if the partition then idles, no observations come in to grow
  # the cap back. When in_flight == 0 we ensure at least
  # initial_max so the partition never fossilizes at min.
  remaining = [remaining, @initial_max].max if in_flight.zero?

  if remaining <= 0
    return Decision.new(allowed: 0,
                        retry_after: @full_backoff,
                        reason: "adaptive_concurrency_full")
  end

  Decision.new(allowed: [remaining, admit_budget].min)
end

#inflight_partition_key(policy_name, ctx) ⇒ Object

Same canonical scope as the staged_jobs partition_key — every gate in a policy uses ‘policy.partition_for(ctx)` so the inflight count and the adaptive stats line up exactly.

Raises:



96
97
98
99
100
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 96

def inflight_partition_key(policy_name, ctx)
  policy = DispatchPolicy.registry.fetch(policy_name)
  raise InvalidPolicy, "unknown policy #{policy_name.inspect}" unless policy
  policy.partition_for(ctx)
end

#nameObject



50
51
52
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 50

def name
  :adaptive_concurrency
end

#record_observation(policy_name:, partition_key:, queue_lag_ms:, succeeded:) ⇒ Object

Called from InflightTracker.track after each perform completes (success or failure). Updates the AIMD state atomically in one SQL statement.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 105

def record_observation(policy_name:, partition_key:, queue_lag_ms:, succeeded:)
  Repository.adaptive_seed!(
    policy_name:   policy_name,
    partition_key: partition_key.to_s,
    initial_max:   @initial_max
  )
  Repository.adaptive_record!(
    policy_name:   policy_name,
    partition_key: partition_key.to_s,
    queue_lag_ms:  queue_lag_ms,
    succeeded:     succeeded,
    alpha:         @ewma_alpha,
    target_lag_ms: @target_lag_ms,
    fail_factor:   @fail_factor,
    slow_factor:   @slow_factor,
    min:           @min
  )
end