Class: DispatchPolicy::AdaptiveConcurrencyStats
- Inherits:
-
ApplicationRecord
- Object
- ActiveRecord::Base
- ApplicationRecord
- DispatchPolicy::AdaptiveConcurrencyStats
- Defined in:
- app/models/dispatch_policy/adaptive_concurrency_stats.rb
Class Method Summary collapse
-
.current_max_for(policy_name:, partition_key:) ⇒ Object
Quick lookup used by Dispatchable to denormalize current_max into the generic partition observation row.
- .fetch_many(policy_name:, gate_name:, partition_keys:) ⇒ Object
-
.record_observation!(policy_name:, gate_name:, partition_key:, queue_lag_ms:, succeeded:, alpha:, min:, target_lag_ms:, fail_factor:, slow_factor:, initial_max:) ⇒ Object
Single-statement EWMA + AIMD update so concurrent performs can’t race on read-modify-write.
-
.seed!(policy_name:, gate_name:, partition_key:, initial_max:) ⇒ Object
Seed a stats row if one doesn’t exist yet.
Class Method Details
.current_max_for(policy_name:, partition_key:) ⇒ Object
Quick lookup used by Dispatchable to denormalize current_max into the generic partition observation row.
84 85 86 87 |
# File 'app/models/dispatch_policy/adaptive_concurrency_stats.rb', line 84 def self.current_max_for(policy_name:, partition_key:) where(policy_name: policy_name, partition_key: partition_key.to_s) .limit(1).pick(:current_max) end |
.fetch_many(policy_name:, gate_name:, partition_keys:) ⇒ Object
25 26 27 28 29 30 |
# File 'app/models/dispatch_policy/adaptive_concurrency_stats.rb', line 25 def self.fetch_many(policy_name:, gate_name:, partition_keys:) return {} if partition_keys.empty? where(policy_name: policy_name, gate_name: gate_name.to_s, partition_key: partition_keys) .pluck(:partition_key, :current_max, :ewma_latency_ms) .each_with_object({}) { |(k, c, l), h| h[k] = { current_max: c, ewma_latency_ms: l } } end |
.record_observation!(policy_name:, gate_name:, partition_key:, queue_lag_ms:, succeeded:, alpha:, min:, target_lag_ms:, fail_factor:, slow_factor:, initial_max:) ⇒ Object
Single-statement EWMA + AIMD update so concurrent performs can’t race on read-modify-write. Seed first (INSERT ON CONFLICT DO NOTHING), then apply the adjustment.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'app/models/dispatch_policy/adaptive_concurrency_stats.rb', line 35 def self.record_observation!( policy_name:, gate_name:, partition_key:, queue_lag_ms:, succeeded:, alpha:, min:, target_lag_ms:, fail_factor:, slow_factor:, initial_max: ) seed!( policy_name: policy_name, gate_name: gate_name, partition_key: partition_key, initial_max: initial_max ) # Feedback signal is queue_lag (admitted_at → perform_start). When # the adapter queue is empty, lag ≈ 0 → +1 grow. When the queue # backs up, lag rises past target → multiplicative shrink. Failures # shrink harder. Only `min` is enforced so a partition can't lock # out entirely. sql = <<~SQL.squish UPDATE #{quoted_table_name} SET ewma_latency_ms = ewma_latency_ms * (1 - ?) + ? * ?, sample_count = sample_count + 1, current_max = GREATEST(?, CASE WHEN ? = FALSE THEN FLOOR(current_max * ?)::int WHEN (ewma_latency_ms * (1 - ?) + ? * ?) > ? THEN FLOOR(current_max * ?)::int ELSE current_max + 1 END), last_observed_at = ?, updated_at = ? WHERE policy_name = ? AND gate_name = ? AND partition_key = ? SQL now = Time.current connection.exec_update( sanitize_sql_array([ sql, alpha, alpha, queue_lag_ms, min.to_i, succeeded, fail_factor, alpha, alpha, queue_lag_ms, target_lag_ms, slow_factor, now, now, policy_name, gate_name.to_s, partition_key.to_s ]) ) end |
.seed!(policy_name:, gate_name:, partition_key:, initial_max:) ⇒ Object
Seed a stats row if one doesn’t exist yet. Mirrors ThrottleBucket.lock.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'app/models/dispatch_policy/adaptive_concurrency_stats.rb', line 8 def self.seed!(policy_name:, gate_name:, partition_key:, initial_max:) now = Time.current sql = <<~SQL.squish INSERT INTO #{quoted_table_name} (policy_name, gate_name, partition_key, current_max, ewma_latency_ms, sample_count, created_at, updated_at) VALUES (?, ?, ?, ?, 0, 0, ?, ?) ON CONFLICT (policy_name, gate_name, partition_key) DO NOTHING SQL connection.exec_update( sanitize_sql_array([ sql, policy_name, gate_name.to_s, partition_key.to_s, initial_max.to_i, now, now ]) ) end |