Class: DispatchPolicy::AdaptiveConcurrencyStats

Inherits:
ApplicationRecord
  • Object
show all
Defined in:
app/models/dispatch_policy/adaptive_concurrency_stats.rb

Class Method Summary collapse

Class Method Details

.current_max_for(policy_name:, partition_key:) ⇒ Object

Quick lookup used by Dispatchable to denormalize current_max into the generic partition observation row.



84
85
86
87
# File 'app/models/dispatch_policy/adaptive_concurrency_stats.rb', line 84

def self.current_max_for(policy_name:, partition_key:)
  where(policy_name: policy_name, partition_key: partition_key.to_s)
    .limit(1).pick(:current_max)
end

.fetch_many(policy_name:, gate_name:, partition_keys:) ⇒ Object



25
26
27
28
29
30
# File 'app/models/dispatch_policy/adaptive_concurrency_stats.rb', line 25

def self.fetch_many(policy_name:, gate_name:, partition_keys:)
  return {} if partition_keys.empty?
  where(policy_name: policy_name, gate_name: gate_name.to_s, partition_key: partition_keys)
    .pluck(:partition_key, :current_max, :ewma_latency_ms)
    .each_with_object({}) { |(k, c, l), h| h[k] = { current_max: c, ewma_latency_ms: l } }
end

.record_observation!(policy_name:, gate_name:, partition_key:, queue_lag_ms:, succeeded:, alpha:, min:, target_lag_ms:, fail_factor:, slow_factor:, initial_max:) ⇒ Object

Single-statement EWMA + AIMD update so concurrent performs can’t race on read-modify-write. Seed first (INSERT ON CONFLICT DO NOTHING), then apply the adjustment.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'app/models/dispatch_policy/adaptive_concurrency_stats.rb', line 35

def self.record_observation!(
  policy_name:, gate_name:, partition_key:,
  queue_lag_ms:, succeeded:,
  alpha:, min:, target_lag_ms:,
  fail_factor:, slow_factor:, initial_max:
)
  seed!(
    policy_name:   policy_name,
    gate_name:     gate_name,
    partition_key: partition_key,
    initial_max:   initial_max
  )

  # Feedback signal is queue_lag (admitted_at → perform_start). When
  # the adapter queue is empty, lag ≈ 0 → +1 grow. When the queue
  # backs up, lag rises past target → multiplicative shrink. Failures
  # shrink harder. Only `min` is enforced so a partition can't lock
  # out entirely.
  sql = <<~SQL.squish
    UPDATE #{quoted_table_name}
    SET
      ewma_latency_ms = ewma_latency_ms * (1 - ?) + ? * ?,
      sample_count    = sample_count + 1,
      current_max = GREATEST(?, CASE
        WHEN ? = FALSE                                THEN FLOOR(current_max * ?)::int
        WHEN (ewma_latency_ms * (1 - ?) + ? * ?) > ?  THEN FLOOR(current_max * ?)::int
        ELSE current_max + 1
      END),
      last_observed_at = ?,
      updated_at       = ?
    WHERE policy_name = ? AND gate_name = ? AND partition_key = ?
  SQL

  now = Time.current
  connection.exec_update(
    sanitize_sql_array([
      sql,
      alpha, alpha, queue_lag_ms,
      min.to_i,
      succeeded, fail_factor,
      alpha, alpha, queue_lag_ms, target_lag_ms, slow_factor,
      now, now,
      policy_name, gate_name.to_s, partition_key.to_s
    ])
  )
end

.seed!(policy_name:, gate_name:, partition_key:, initial_max:) ⇒ Object

Seed a stats row if one doesn’t exist yet. Mirrors ThrottleBucket.lock.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'app/models/dispatch_policy/adaptive_concurrency_stats.rb', line 8

def self.seed!(policy_name:, gate_name:, partition_key:, initial_max:)
  now = Time.current
  sql = <<~SQL.squish
    INSERT INTO #{quoted_table_name}
      (policy_name, gate_name, partition_key, current_max,
       ewma_latency_ms, sample_count, created_at, updated_at)
    VALUES (?, ?, ?, ?, 0, 0, ?, ?)
    ON CONFLICT (policy_name, gate_name, partition_key) DO NOTHING
  SQL
  connection.exec_update(
    sanitize_sql_array([
      sql, policy_name, gate_name.to_s, partition_key.to_s,
      initial_max.to_i, now, now
    ])
  )
end