Class: DispatchPolicy::PartitionObservation
- Inherits:
-
ApplicationRecord
- Object
- ActiveRecord::Base
- ApplicationRecord
- DispatchPolicy::PartitionObservation
- Defined in:
- app/models/dispatch_policy/partition_observation.rb
Overview
Minute-bucketed observability per (policy, partition). Any gate with partition_by gets an observation row here — adaptive, throttle, concurrency, whatever — so the admin chart shows queue lag / throughput for all partitioned policies, not just the adaptive ones.
One row per (policy, partition, minute): total_lag_ms accumulates the sum of queue_lag_ms observations in that minute, total_duration_ms accumulates perform durations (used by :time_budget and :fair_time_share), observation_count increments, max_lag_ms / max_duration_ms track worst spikes. Averages are derived on read as total / count.
Constant Summary collapse
- OBSERVATION_TTL =
2 hours
2 * 60 * 60
Class Method Summary collapse
-
.consumed_ms_by_partition(policy_name:, partition_keys:, window:) ⇒ Object
Sum of perform durations per partition over the last ‘window` seconds.
- .observe!(policy_name:, partition_key:, queue_lag_ms:, duration_ms: 0, current_max: nil) ⇒ Object
- .prune! ⇒ Object
Class Method Details
.consumed_ms_by_partition(policy_name:, partition_keys:, window:) ⇒ Object
Sum of perform durations per partition over the last ‘window` seconds. Used by :fair_time_share to bias admission ordering toward partitions that have consumed less compute time recently.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'app/models/dispatch_policy/partition_observation.rb', line 53 def self.consumed_ms_by_partition(policy_name:, partition_keys:, window:) return {} if partition_keys.empty? # minute_bucket is floored on insert (date_trunc('minute', now)). # An observation written T seconds ago lives in a bucket up to 60s # earlier than T. Add a one-bucket pad to the lower bound so the # most recent bucket is always inside the window — without it, the # previous-minute bucket is silently excluded as soon as the wall # clock crosses a minute boundary. since = Time.current - window - 60 rows = where(policy_name: policy_name, partition_key: partition_keys.map(&:to_s)) .where("minute_bucket >= ?", since) .group(:partition_key) .pluck(Arel.sql("partition_key, SUM(total_duration_ms), SUM(observation_count)")) rows.each_with_object({}) do |(key, total, count), acc| acc[key] = { consumed_ms: total.to_i, count: count.to_i } end end |
.observe!(policy_name:, partition_key:, queue_lag_ms:, duration_ms: 0, current_max: nil) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'app/models/dispatch_policy/partition_observation.rb', line 19 def self.observe!(policy_name:, partition_key:, queue_lag_ms:, duration_ms: 0, current_max: nil) return if partition_key.nil? || partition_key.to_s.empty? now = Time.current lag = queue_lag_ms.to_i dur = duration_ms.to_i sql = <<~SQL.squish INSERT INTO #{quoted_table_name} (policy_name, partition_key, minute_bucket, total_lag_ms, total_duration_ms, observation_count, max_lag_ms, max_duration_ms, current_max, created_at, updated_at) VALUES (?, ?, date_trunc('minute', ?::timestamp), ?, ?, 1, ?, ?, ?, ?, ?) ON CONFLICT (policy_name, partition_key, minute_bucket) DO UPDATE SET total_lag_ms = #{quoted_table_name}.total_lag_ms + EXCLUDED.total_lag_ms, total_duration_ms = #{quoted_table_name}.total_duration_ms + EXCLUDED.total_duration_ms, observation_count = #{quoted_table_name}.observation_count + 1, max_lag_ms = GREATEST(#{quoted_table_name}.max_lag_ms, EXCLUDED.max_lag_ms), max_duration_ms = GREATEST(#{quoted_table_name}.max_duration_ms, EXCLUDED.max_duration_ms), current_max = COALESCE(EXCLUDED.current_max, #{quoted_table_name}.current_max), updated_at = EXCLUDED.updated_at SQL connection.exec_update( sanitize_sql_array([ sql, policy_name, partition_key.to_s, now, lag, dur, lag, dur, current_max, now, now ]) ) end |
.prune! ⇒ Object
72 73 74 |
# File 'app/models/dispatch_policy/partition_observation.rb', line 72 def self.prune! where("minute_bucket < ?", Time.current - OBSERVATION_TTL).delete_all end |