Class: DispatchPolicy::Gates::AdaptiveConcurrency

Inherits:
DispatchPolicy::Gate show all
Defined in:
lib/dispatch_policy/gates/adaptive_concurrency.rb

Overview

Adaptive variant of :concurrency. The cap per partition (current_max) shrinks when the adapter queue backs up (recent queue_lag > target) or when performs fail; grows back when workers drain admissions quickly (queue_lag near zero). The signal is pure queue wait — admitted_at →perform_start — so it reflects “are we admitting too fast?” without getting polluted by how long the external work takes.

AIMD loop on a per-partition stats row; the underlying in-flight counter is the same PartitionInflightCount used by :concurrency.

Constant Summary collapse

DEFAULT_EWMA_ALPHA =

alpha is fast enough that a single spike is forgotten in ~3 observations instead of ~15. slow_factor 0.95 halves the per- observation shrink magnitude so the cap no longer overshoots after a burst drains the adapter queue.

0.5
DEFAULT_FAIL_FACTOR =
0.5
DEFAULT_SLOW_FACTOR =
0.95

Instance Attribute Summary collapse

Attributes inherited from DispatchPolicy::Gate

#name, #partition_by, #policy

Instance Method Summary collapse

Methods inherited from DispatchPolicy::Gate

#initialize, #partition_key_for, register, registry

Constructor Details

This class inherits a constructor from DispatchPolicy::Gate

Instance Attribute Details

#ewma_alphaObject (readonly)

Returns the value of attribute ewma_alpha.



45
46
47
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45

def ewma_alpha
  @ewma_alpha
end

#fail_factorObject (readonly)

Returns the value of attribute fail_factor.



45
46
47
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45

def fail_factor
  @fail_factor
end

#initial_maxObject (readonly)

Returns the value of attribute initial_max.



45
46
47
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45

def initial_max
  @initial_max
end

#minObject (readonly)

Returns the value of attribute min.



45
46
47
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45

def min
  @min
end

#slow_factorObject (readonly)

Returns the value of attribute slow_factor.



45
46
47
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45

def slow_factor
  @slow_factor
end

#target_lag_msObject (readonly)

Returns the value of attribute target_lag_ms.



45
46
47
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 45

def target_lag_ms
  @target_lag_ms
end

Instance Method Details

#configure(initial_max:, target_lag_ms: nil, target_latency: nil, min: 1, ewma_alpha: DEFAULT_EWMA_ALPHA, failure_decrease_factor: DEFAULT_FAIL_FACTOR, overload_decrease_factor: DEFAULT_SLOW_FACTOR) ⇒ Object

target_lag_ms accepts the legacy alias ‘target_latency` for backwards compatibility.

Raises:

  • (ArgumentError)


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 25

def configure(initial_max:,
              target_lag_ms: nil,
              target_latency: nil,
              min: 1,
              ewma_alpha: DEFAULT_EWMA_ALPHA,
              failure_decrease_factor: DEFAULT_FAIL_FACTOR,
              overload_decrease_factor: DEFAULT_SLOW_FACTOR)
  @initial_max    = initial_max
  @min            = min
  @target_lag_ms  = target_lag_ms || target_latency
  @ewma_alpha     = ewma_alpha
  @fail_factor    = failure_decrease_factor
  @slow_factor    = overload_decrease_factor
  raise ArgumentError, "adaptive_concurrency requires target_lag_ms" if @target_lag_ms.nil?
end

#filter(batch, context) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 48

def filter(batch, context)
  by_partition = batch.group_by { |staged| partition_key_for(context.for(staged)) }

  # Seed any missing stats rows so the first admission has something
  # to read. Cheap: one INSERT ... ON CONFLICT DO NOTHING per key.
  by_partition.each_key do |key|
    AdaptiveConcurrencyStats.seed!(
      policy_name:   policy.name,
      gate_name:     name,
      partition_key: key,
      initial_max:   resolve(@initial_max, nil).to_i
    )
  end

  stats = AdaptiveConcurrencyStats.fetch_many(
    policy_name:    policy.name,
    gate_name:      name,
    partition_keys: by_partition.keys
  )

  in_flight = PartitionInflightCount.fetch_many(
    policy_name:    policy.name,
    gate_name:      name,
    partition_keys: by_partition.keys
  )

  min_v = resolve(@min, nil).to_i

  admitted = []
  by_partition.each do |partition_key, jobs|
    effective_max = stats.dig(partition_key, :current_max) || resolve(@initial_max, nil).to_i
    effective_max = [ effective_max, min_v ].max
    used = in_flight.fetch(partition_key, 0)

    # Safety valve: if nothing is in-flight for this partition and
    # there's pending, the adapter queue is (or is about to be)
    # empty and workers will idle. Ensure we hand over at least
    # initial_max so the stream never dries up on its own.
    if used.zero? && jobs.any?
      effective_max = [ effective_max, resolve(@initial_max, nil).to_i ].max
    end

    jobs.each do |staged|
      break unless used < effective_max
      admitted << [ staged, partition_key ]
      used += 1
    end
  end

  context.record_partitions(admitted, gate: name)
  admitted.map(&:first)
end

#record_observation(partition_key:, queue_lag_ms:, succeeded:) ⇒ Object

Called by Dispatchable#around_perform for each adaptive gate that touched this job. Lives on the gate instance because configuration (alpha, target_latency, etc.) is per gate.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 104

def record_observation(partition_key:, queue_lag_ms:, succeeded:)
  AdaptiveConcurrencyStats.record_observation!(
    policy_name:       policy.name,
    gate_name:         name,
    partition_key:     partition_key.to_s,
    queue_lag_ms:      queue_lag_ms,
    succeeded:         succeeded,
    alpha:             @ewma_alpha,
    min:               resolve(@min, nil).to_i,
    target_lag_ms:     resolve(@target_lag_ms, nil).to_f,
    fail_factor:       @fail_factor,
    slow_factor:       @slow_factor,
    initial_max:       resolve(@initial_max, nil).to_i
  )
end

#tracks_inflight?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/dispatch_policy/gates/adaptive_concurrency.rb', line 41

def tracks_inflight?
  true
end