Class: DispatchPolicy::Gates::AdaptiveConcurrency

Inherits:
DispatchPolicy::Gate show all
Defined in:
lib/dispatch_policy/gates/adaptive_concurrency.rb

Overview

Self-tuning concurrency gate. Like :concurrency but with a per-partition cap (‘current_max`) that grows when the adapter queue is empty and shrinks when it builds up. AIMD loop persisted in `dispatch_policy_adaptive_concurrency_stats`.

Feedback signal is ‘queue_lag_ms = perform_start - admitted_at` (time the job spent waiting in the adapter after admission). Pure saturation signal — slow performs in the downstream service don’t punish admissions if workers still drain the queue quickly.

Update rule applied after each perform (in InflightTracker.track):

succeeded? & ewma_lag <= target_lag_ms → current_max += 1
succeeded? & ewma_lag >  target_lag_ms → current_max *= slow_factor
failed?                                → current_max *= fail_factor

Always clamped to >= min. Never grows without bound — the algorithm self-limits via target_lag_ms.

Constant Summary collapse

DEFAULT_FULL_BACKOFF =

seconds

1.0
DEFAULT_EWMA_ALPHA =

weight of the new sample in the EWMA

0.5
DEFAULT_FAIL_FACTOR =

halve on perform raise

0.5
DEFAULT_SLOW_FACTOR =

gentle shrink on overload

0.95

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from DispatchPolicy::Gate

#consume

Constructor Details

#initialize(initial_max:, target_lag_ms:, min: 1, ewma_alpha: DEFAULT_EWMA_ALPHA, failure_decrease_factor: DEFAULT_FAIL_FACTOR, overload_decrease_factor: DEFAULT_SLOW_FACTOR, full_backoff: DEFAULT_FULL_BACKOFF) ⇒ AdaptiveConcurrency

Returns a new instance of AdaptiveConcurrency.

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 32

def initialize(initial_max:, target_lag_ms:, min: 1,
               ewma_alpha: DEFAULT_EWMA_ALPHA,
               failure_decrease_factor: DEFAULT_FAIL_FACTOR,
               overload_decrease_factor: DEFAULT_SLOW_FACTOR,
               full_backoff: DEFAULT_FULL_BACKOFF)
  super()
  @initial_max   = Integer(initial_max)
  @target_lag_ms = Float(target_lag_ms)
  @min           = Integer(min)
  @ewma_alpha    = Float(ewma_alpha)
  @fail_factor   = Float(failure_decrease_factor)
  @slow_factor   = Float(overload_decrease_factor)
  @full_backoff  = Float(full_backoff)
  raise ArgumentError, "target_lag_ms must be > 0" unless @target_lag_ms.positive?
  raise ArgumentError, "min must be >= 1"          unless @min >= 1
  raise ArgumentError, "initial_max must be >= min" unless @initial_max >= @min
  # Out-of-range tuning knobs invert the AIMD loop instead of erroring:
  # alpha=0 freezes the EWMA at its seed so the cap grows unbounded;
  # a decrease factor >= 1 turns the multiplicative *decrease* into an
  # increase, a positive-feedback loop under failure/overload.
  unless @ewma_alpha > 0 && @ewma_alpha <= 1
    raise ArgumentError, "ewma_alpha must be in (0, 1]"
  end
  unless @fail_factor > 0 && @fail_factor < 1
    raise ArgumentError, "failure_decrease_factor must be in (0, 1)"
  end
  unless @slow_factor > 0 && @slow_factor < 1
    raise ArgumentError, "overload_decrease_factor must be in (0, 1)"
  end
  raise ArgumentError, "full_backoff must be >= 0" if @full_backoff.negative?
end

Instance Attribute Details

#ewma_alphaObject (readonly)

Returns the value of attribute ewma_alpha.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def ewma_alpha
  @ewma_alpha
end

#fail_factorObject (readonly)

Returns the value of attribute fail_factor.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def fail_factor
  @fail_factor
end

#full_backoffObject (readonly)

Returns the value of attribute full_backoff.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def full_backoff
  @full_backoff
end

#initial_maxObject (readonly)

Returns the value of attribute initial_max.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def initial_max
  @initial_max
end

#minObject (readonly)

Returns the value of attribute min.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def min
  @min
end

#slow_factorObject (readonly)

Returns the value of attribute slow_factor.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def slow_factor
  @slow_factor
end

#target_lag_msObject (readonly)

Returns the value of attribute target_lag_ms.



29
30
31
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 29

def target_lag_ms
  @target_lag_ms
end

Instance Method Details

#evaluate(ctx, partition, admit_budget) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 68

def evaluate(ctx, partition, admit_budget)
  policy_name = partition["policy_name"]
  key         = inflight_partition_key(policy_name, ctx)

  # Seed lazily so the very first admission has a row to read
  # (and so record_observation can UPDATE without a check).
  Repository.adaptive_seed!(
    policy_name:   policy_name,
    partition_key: key,
    initial_max:   @initial_max
  )

  cap = Repository.adaptive_current_max(
    policy_name:   policy_name,
    partition_key: key
  ) || @initial_max
  cap = [cap, @min].max

  in_flight = Repository.count_inflight(
    policy_name:   policy_name,
    partition_key: key
  )
  remaining = cap - in_flight

  # Safety valve. AIMD can shrink current_max during a slow burst;
  # if the partition then idles, no observations come in to grow
  # the cap back. When in_flight == 0 we ensure at least
  # initial_max so the partition never fossilizes at min.
  remaining = [remaining, @initial_max].max if in_flight.zero?

  if remaining <= 0
    return Decision.new(allowed: 0,
                        retry_after: @full_backoff,
                        reason: "adaptive_concurrency_full")
  end

  Decision.new(allowed: [remaining, admit_budget].min)
end

#inflight_partition_key(policy_name, ctx) ⇒ Object

Same canonical scope as the staged_jobs partition_key — every gate in a policy uses ‘policy.partition_for(ctx)` so the inflight count and the adaptive stats line up exactly.

Raises:



110
111
112
113
114
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 110

def inflight_partition_key(policy_name, ctx)
  policy = DispatchPolicy.registry.fetch(policy_name)
  raise InvalidPolicy, "unknown policy #{policy_name.inspect}" unless policy
  policy.partition_for(ctx)
end

#nameObject



64
65
66
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 64

def name
  :adaptive_concurrency
end

#record_observation(policy_name:, partition_key:, queue_lag_ms:, succeeded:) ⇒ Object

Called from InflightTracker.track after each perform completes (success or failure). Updates the AIMD state atomically in one SQL statement.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 119

def record_observation(policy_name:, partition_key:, queue_lag_ms:, succeeded:)
  Repository.adaptive_seed!(
    policy_name:   policy_name,
    partition_key: partition_key.to_s,
    initial_max:   @initial_max
  )
  Repository.adaptive_record!(
    policy_name:   policy_name,
    partition_key: partition_key.to_s,
    queue_lag_ms:  queue_lag_ms,
    succeeded:     succeeded,
    alpha:         @ewma_alpha,
    target_lag_ms: @target_lag_ms,
    fail_factor:   @fail_factor,
    slow_factor:   @slow_factor,
    min:           @min
  )
end