Module: DispatchPolicy::Repository
- Defined in:
- lib/dispatch_policy/repository.rb
Overview
SQL access layer for staged_jobs / partitions / inflight_jobs.
Hot paths use raw SQL via ActiveRecord::Base.connection so we get ‘FOR UPDATE SKIP LOCKED`, multi-row UPSERTs, and DELETE … RETURNING without ActiveRecord overhead. Read paths in the engine UI use the AR models in app/models/dispatch_policy/*.
Constant Summary collapse
- STAGED_TABLE =
"dispatch_policy_staged_jobs"- PARTITIONS_TABLE =
"dispatch_policy_partitions"- INFLIGHT_TABLE =
"dispatch_policy_inflight_jobs"- SAMPLES_TABLE =
"dispatch_policy_tick_samples"- ADAPTIVE_TABLE =
"dispatch_policy_adaptive_concurrency_stats"
Class Method Summary collapse
-
.adaptive_current_max(policy_name:, partition_key:) ⇒ Object
Fetch the AIMD-tuned cap for a partition.
-
.adaptive_record!(policy_name:, partition_key:, queue_lag_ms:, succeeded:, alpha:, target_lag_ms:, fail_factor:, slow_factor:, min:) ⇒ Object
Single-statement EWMA + AIMD update.
-
.adaptive_seed!(policy_name:, partition_key:, initial_max:) ⇒ Object
Insert a fresh stats row for the given partition if none exists.
-
.bulk_record_partition_denies!(entries) ⇒ Object
Bulk-update many partitions whose pipeline this tick decided to deny.
-
.claim_partitions(policy_name:, limit:, shard: nil) ⇒ Object
Lock + return up to ‘limit` partitions ready to be evaluated by the tick.
-
.claim_staged_jobs!(policy_name:, partition_key:, limit:, retry_after:, gate_state_patch: nil, half_life_seconds: nil) ⇒ Object
Atomically claim up to ‘limit` staged rows for a partition (DELETE … RETURNING) and update the partition’s counters / gate_state / next_eligible_at in the same transaction.
- .connection ⇒ Object
- .count_inflight(policy_name:, partition_key:) ⇒ Object
- .delete_inflight!(active_job_id:) ⇒ Object
-
.denied_reasons_summary(policy_name: nil, since:) ⇒ Object
Aggregate denied_reasons jsonb across samples in window: returns { “throttle” => 12, “concurrency_full” => 3, … }.
- .heartbeat_inflight!(active_job_id:) ⇒ Object
-
.insert_inflight!(rows) ⇒ Object
—– inflight tracking —————————————————.
- .next_eligible_clause(retry_after) ⇒ Object
-
.normalize_partition(row) ⇒ Object
—– helpers ————————————————————–.
- .normalize_staged(row) ⇒ Object
- .parse_jsonb(value) ⇒ Object
-
.partition_counts_by_policy ⇒ Object
Per-policy partition counts in one grouped query, keyed by policy_name → { pending, partitions, paused }.
-
.partition_round_trip_stats(policy_name: nil) ⇒ Object
Round-trip statistics across active partitions: how stale is the most- stale partition the tick has yet to revisit? P50/P95/oldest ages help decide if partition_batch_size needs to grow or ticks need sharding.
-
.partition_round_trip_stats_by_policy ⇒ Object
Per-policy round-trip stats in one grouped query, keyed by policy_name.
-
.record_partition_admit!(policy_name:, partition_key:, admitted:, gate_state_patch:, retry_after:, half_life_seconds: nil) ⇒ Object
Per-partition admit-state UPDATE.
-
.record_tick_sample!(policy_name:, duration_ms:, partitions_seen:, partitions_admitted:, partitions_denied:, jobs_admitted:, forward_failures:, pending_total:, inflight_total:, denied_reasons:) ⇒ Object
Records one row per Tick.run with admission and timing aggregates so the operator UI can display rates over time without sampling on the read path.
- .sample_filter(policy_name, since) ⇒ Object
-
.stage!(policy_name:, partition_key:, queue_name:, job_class:, job_data:, context:, shard: Policy::DEFAULT_SHARD, scheduled_at: nil, priority: 0) ⇒ Object
Insert one staged_job row + UPSERT its partition.
-
.stage_many!(rows) ⇒ Object
Bulk version for perform_all_later.
-
.sweep_inactive_partitions!(cutoff_seconds:) ⇒ Object
—————————————————————————-.
-
.sweep_old_tick_samples!(cutoff_seconds:) ⇒ Object
—– tick samples sweep ————————————————-.
-
.sweep_stale_inflight!(cutoff_seconds:, queued_cutoff_seconds: nil) ⇒ Object
Reap inflight rows whose owner is gone.
-
.tick_samples_buckets(policy_name: nil, since:, bucket_seconds: 60) ⇒ Object
Returns time-bucketed series for sparklines.
-
.tick_summaries_by_policy(since:) ⇒ Object
One grouped query returning per-policy tick aggregates, keyed by policy_name.
-
.tick_summary(policy_name: nil, since:) ⇒ Object
Aggregate counters since ‘since` (a Time).
-
.top_denied_reason_by_policy(since:) ⇒ Object
The single most-denied reason per policy in one query, keyed by policy_name → [reason, count].
-
.trend_direction(values, threshold_ratio: 0.10) ⇒ Object
Direction of a numeric series.
- .upsert_partition!(policy_name:, partition_key:, queue_name:, context:, delta_pending:, shard: Policy::DEFAULT_SHARD) ⇒ Object
-
.with_connection ⇒ Object
Wraps ‘block` in `connected_to(role: …)` when DispatchPolicy.config .database_role is set.
Class Method Details
.adaptive_current_max(policy_name:, partition_key:) ⇒ Object
Fetch the AIMD-tuned cap for a partition. Returns nil when the row doesn’t exist yet — caller should fall back to initial_max.
761 762 763 764 765 766 767 768 769 |
# File 'lib/dispatch_policy/repository.rb', line 761 def adaptive_current_max(policy_name:, partition_key:) result = connection.exec_query( "SELECT current_max FROM #{ADAPTIVE_TABLE} WHERE policy_name = $1 AND partition_key = $2 LIMIT 1", "adaptive_current_max", [policy_name, partition_key] ) row = result.first row && row["current_max"].to_i end |
.adaptive_record!(policy_name:, partition_key:, queue_lag_ms:, succeeded:, alpha:, target_lag_ms:, fail_factor:, slow_factor:, min:) ⇒ Object
Single-statement EWMA + AIMD update. Concurrent workers can call this in any order without read-modify-write races: every clause reads the row’s current value at the start of the UPDATE.
ewma_latency_ms_new = ewma_latency_ms * (1 - α) + α * queue_lag_ms current_max_new = GREATEST(min,
FAILED? FLOOR(current_max * fail_factor)
OVERLOADED? FLOOR(current_max * slow_factor)
else current_max + 1)
780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 |
# File 'lib/dispatch_policy/repository.rb', line 780 def adaptive_record!(policy_name:, partition_key:, queue_lag_ms:, succeeded:, alpha:, target_lag_ms:, fail_factor:, slow_factor:, min:) connection.exec_query( <<~SQL.squish, UPDATE #{ADAPTIVE_TABLE} SET ewma_latency_ms = ewma_latency_ms * (1 - $3::double precision) + $3::double precision * $4::double precision, sample_count = sample_count + 1, current_max = GREATEST($5::int, CASE WHEN $6::boolean = FALSE THEN FLOOR(current_max * $7::double precision)::int WHEN (ewma_latency_ms * (1 - $3::double precision) + $3::double precision * $4::double precision) > $8::double precision THEN FLOOR(current_max * $9::double precision)::int ELSE current_max + 1 END), last_observed_at = now(), updated_at = now() WHERE policy_name = $1 AND partition_key = $2 SQL "adaptive_record", [policy_name, partition_key, alpha.to_f, queue_lag_ms.to_f, min.to_i, succeeded ? true : false, fail_factor.to_f, target_lag_ms.to_f, slow_factor.to_f] ) end |
.adaptive_seed!(policy_name:, partition_key:, initial_max:) ⇒ Object
Insert a fresh stats row for the given partition if none exists. Idempotent — runs as ‘INSERT … ON CONFLICT DO NOTHING`. Cheap to call on every admission so the gate’s evaluate path can read current_max safely without checking for existence first.
745 746 747 748 749 750 751 752 753 754 755 756 757 |
# File 'lib/dispatch_policy/repository.rb', line 745 def adaptive_seed!(policy_name:, partition_key:, initial_max:) connection.exec_query( <<~SQL.squish, INSERT INTO #{ADAPTIVE_TABLE} (policy_name, partition_key, current_max, ewma_latency_ms, sample_count, created_at, updated_at) VALUES ($1, $2, $3, 0, 0, now(), now()) ON CONFLICT (policy_name, partition_key) DO NOTHING SQL "adaptive_seed", [policy_name, partition_key, initial_max.to_i] ) end |
.bulk_record_partition_denies!(entries) ⇒ Object
Bulk-update many partitions whose pipeline this tick decided to deny. One UPDATE…FROM(VALUES…) instead of one UPDATE per partition, which cuts a tick with ‘partition_batch_size = 50` from ~50 round-trips on the deny path to one. The deny path doesn’t touch pending_count or total_admitted (admitted = 0 makes them no-ops in the per-row UPDATE), so we only write gate_state and next_eligible_at here.
Each entry: { policy_name:, partition_key:, gate_state_patch:, retry_after: }. Independent per row — the join via FROM(VALUES…) makes the bulk statement equivalent to N sequential UPDATEs in correctness terms; the row-level locks held by ‘claim_partitions` (FOR UPDATE SKIP LOCKED, last_checked_at bumped) keep concurrent ticks away from the same partitions while we batch.
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/dispatch_policy/repository.rb', line 313 def bulk_record_partition_denies!(entries) return if entries.empty? values_sql = [] params = [] entries.each_with_index do |e, idx| base = idx * 4 values_sql << "($#{base + 1}::text, $#{base + 2}::text, $#{base + 3}::jsonb, $#{base + 4}::numeric)" params.push( e[:policy_name], e[:partition_key], JSON.dump(e[:gate_state_patch] || {}), e[:retry_after].nil? ? nil : e[:retry_after].to_f.round(3) ) end connection.exec_query( <<~SQL.squish, UPDATE #{PARTITIONS_TABLE} p SET gate_state = p.gate_state || v.gate_state_patch, next_eligible_at = CASE WHEN v.retry_after_secs IS NULL THEN NULL ELSE now() + (v.retry_after_secs || ' seconds')::interval END, updated_at = now() FROM (VALUES #{values_sql.join(", ")}) AS v(policy_name, partition_key, gate_state_patch, retry_after_secs) WHERE p.policy_name = v.policy_name AND p.partition_key = v.partition_key SQL "bulk_record_partition_denies", params ) end |
.claim_partitions(policy_name:, limit:, shard: nil) ⇒ Object
Lock + return up to ‘limit` partitions ready to be evaluated by the tick. Each row’s last_checked_at is bumped to now() so the next tick fairly picks others. Locked rows are released when the transaction commits.
When ‘shard` is non-nil, only partitions on that shard are claimed —this lets several tick processes work on the same policy in parallel, one per shard.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/dispatch_policy/repository.rb', line 156 def claim_partitions(policy_name:, limit:, shard: nil) params = [policy_name] shard_sql = "" if shard params << shard shard_sql = " AND shard = $#{params.size}" end params << limit sql = <<~SQL.squish WITH candidates AS ( SELECT id FROM #{PARTITIONS_TABLE} WHERE policy_name = $1 AND status = 'active' AND pending_count > 0 AND (next_eligible_at IS NULL OR next_eligible_at <= now()) #{shard_sql} ORDER BY last_checked_at NULLS FIRST, id LIMIT $#{params.size} FOR UPDATE SKIP LOCKED ) UPDATE #{PARTITIONS_TABLE} p SET last_checked_at = now() FROM candidates WHERE p.id = candidates.id RETURNING p.* SQL result = connection.exec_query(sql, "claim_partitions", params) result.to_a.map { |row| normalize_partition(row) } end |
.claim_staged_jobs!(policy_name:, partition_key:, limit:, retry_after:, gate_state_patch: nil, half_life_seconds: nil) ⇒ Object
Atomically claim up to ‘limit` staged rows for a partition (DELETE …RETURNING) and update the partition’s counters / gate_state / next_eligible_at in the same transaction.
‘limit` MUST be positive: the deny path (no rows to admit) goes through `bulk_record_partition_denies!` instead, which collapses many partitions into a single UPDATE…FROM(VALUES…) at the end of the tick.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/dispatch_policy/repository.rb', line 195 def claim_staged_jobs!(policy_name:, partition_key:, limit:, retry_after:, gate_state_patch: nil, half_life_seconds: nil) raise ArgumentError, "claim_staged_jobs! requires limit > 0" unless limit.positive? sql_select = <<~SQL.squish WITH claimed AS ( SELECT id FROM #{STAGED_TABLE} WHERE policy_name = $1 AND partition_key = $2 AND (scheduled_at IS NULL OR scheduled_at <= now()) ORDER BY priority DESC, scheduled_at NULLS FIRST, id LIMIT $3 FOR UPDATE SKIP LOCKED ) DELETE FROM #{STAGED_TABLE} s USING claimed WHERE s.id = claimed.id RETURNING s.* SQL rows = connection.exec_query(sql_select, "claim_staged_jobs", [policy_name, partition_key, limit]).to_a # The gate_state patch may depend on how many rows we actually # claimed (e.g. the throttle charges its bucket for jobs admitted, # not for the optimistic `allowed`). When the caller passes a block # it receives that real count and returns the patch to persist; # gate-less callers pass a fixed `gate_state_patch:` instead. patch = block_given? ? yield(rows.size) : (gate_state_patch || {}) record_partition_admit!( policy_name: policy_name, partition_key: partition_key, admitted: rows.size, gate_state_patch: patch, retry_after: retry_after, half_life_seconds: half_life_seconds ) rows.map { |r| normalize_staged(r) } end |
.connection ⇒ Object
21 22 23 |
# File 'lib/dispatch_policy/repository.rb', line 21 def connection ActiveRecord::Base.connection end |
.count_inflight(policy_name:, partition_key:) ⇒ Object
397 398 399 400 401 402 403 404 |
# File 'lib/dispatch_policy/repository.rb', line 397 def count_inflight(policy_name:, partition_key:) result = connection.exec_query( "SELECT count(*)::int AS n FROM #{INFLIGHT_TABLE} WHERE policy_name = $1 AND partition_key = $2", "count_inflight", [policy_name, partition_key] ) Integer(result.rows.first.first) end |
.delete_inflight!(active_job_id:) ⇒ Object
381 382 383 384 385 386 387 |
# File 'lib/dispatch_policy/repository.rb', line 381 def delete_inflight!(active_job_id:) connection.exec_query( "DELETE FROM #{INFLIGHT_TABLE} WHERE active_job_id = $1", "delete_inflight", [active_job_id] ) end |
.denied_reasons_summary(policy_name: nil, since:) ⇒ Object
Aggregate denied_reasons jsonb across samples in window: returns { “throttle” => 12, “concurrency_full” => 3, … }
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 |
# File 'lib/dispatch_policy/repository.rb', line 537 def denied_reasons_summary(policy_name: nil, since:) where_sql, params = sample_filter(policy_name, since) result = connection.exec_query( <<~SQL.squish, SELECT key, SUM(value::int)::int AS total FROM #{SAMPLES_TABLE}, LATERAL jsonb_each_text(denied_reasons) #{where_sql} GROUP BY key ORDER BY total DESC SQL "denied_reasons_summary", params ) result.to_a.each_with_object({}) { |r, h| h[r["key"]] = r["total"].to_i } end |
.heartbeat_inflight!(active_job_id:) ⇒ Object
389 390 391 392 393 394 395 |
# File 'lib/dispatch_policy/repository.rb', line 389 def heartbeat_inflight!(active_job_id:) connection.exec_query( "UPDATE #{INFLIGHT_TABLE} SET heartbeat_at = now() WHERE active_job_id = $1", "heartbeat_inflight", [active_job_id] ) end |
.insert_inflight!(rows) ⇒ Object
—– inflight tracking —————————————————
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/dispatch_policy/repository.rb', line 349 def insert_inflight!(rows) return if rows.empty? values_sql = [] params = [] rows.each_with_index do |row, idx| base = idx * 3 values_sql << "($#{base + 1}, $#{base + 2}, $#{base + 3}, now(), now())" params.push(row[:policy_name], row[:partition_key], row[:active_job_id]) end # ON CONFLICT (active_job_id) DO NOTHING covers two paths that # the around_perform tracker exercises on its own: # 1) the around_perform inflight insert runs even when the row # was already pre-inserted by Tick (concurrency-gated policies); # 2) a stale row that survived a crash gets re-inserted by the # around_perform without colliding while the sweeper is still # catching up. # Admission proper can no longer collide here: Tick regenerates # active_job_id before this insert, so each admission contributes a # fresh UUID. connection.exec_query( <<~SQL.squish, INSERT INTO #{INFLIGHT_TABLE} (policy_name, partition_key, active_job_id, admitted_at, heartbeat_at) VALUES #{values_sql.join(", ")} ON CONFLICT (active_job_id) DO NOTHING SQL "insert_inflight", params ) end |
.next_eligible_clause(retry_after) ⇒ Object
878 879 880 881 882 883 884 885 |
# File 'lib/dispatch_policy/repository.rb', line 878 def next_eligible_clause(retry_after) if retry_after.nil? ["NULL", []] else # 5th param ($5) — caller appends params to those of the parent UPDATE ["now() + ($5 || ' seconds')::interval", [retry_after.to_f.round(3)]] end end |
.normalize_partition(row) ⇒ Object
—– helpers ————————————————————–
839 840 841 842 843 844 845 |
# File 'lib/dispatch_policy/repository.rb', line 839 def normalize_partition(row) out = {} row.each { |k, v| out[k.to_s] = v } out["context"] = parse_jsonb(out["context"]) out["gate_state"] = parse_jsonb(out["gate_state"]) out end |
.normalize_staged(row) ⇒ Object
847 848 849 850 851 852 853 |
# File 'lib/dispatch_policy/repository.rb', line 847 def normalize_staged(row) out = {} row.each { |k, v| out[k.to_s] = v } out["job_data"] = parse_jsonb(out["job_data"]) out["context"] = parse_jsonb(out["context"]) out end |
.parse_jsonb(value) ⇒ Object
855 856 857 858 859 860 861 862 863 864 865 866 |
# File 'lib/dispatch_policy/repository.rb', line 855 def parse_jsonb(value) case value when Hash, Array then value when nil, "" then {} else begin JSON.parse(value) rescue JSON::ParserError {} end end end |
.partition_counts_by_policy ⇒ Object
Per-policy partition counts in one grouped query, keyed by policy_name → { pending, partitions, paused }. Replaces calling Partition.for_policy(name).sum/.count/.paused.count once per policy on the policies index (3N queries → 1).
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 |
# File 'lib/dispatch_policy/repository.rb', line 687 def partition_counts_by_policy result = connection.exec_query( <<~SQL.squish, SELECT policy_name, COALESCE(SUM(pending_count), 0)::int AS pending, COUNT(*)::int AS partitions, COUNT(*) FILTER (WHERE status = 'paused')::int AS paused FROM #{PARTITIONS_TABLE} GROUP BY policy_name SQL "partition_counts_by_policy", [] ) result.to_a.each_with_object({}) do |r, h| h[r["policy_name"]] = { pending: r["pending"].to_i, partitions: r["partitions"].to_i, paused: r["paused"].to_i } end end |
.partition_round_trip_stats(policy_name: nil) ⇒ Object
Round-trip statistics across active partitions: how stale is the most- stale partition the tick has yet to revisit? P50/P95/oldest ages help decide if partition_batch_size needs to grow or ticks need sharding.
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 |
# File 'lib/dispatch_policy/repository.rb', line 643 def partition_round_trip_stats(policy_name: nil) filter_sql = "WHERE p.status = 'active' AND p.pending_count > 0" params = [] if policy_name filter_sql += " AND p.policy_name = $1" params << policy_name end # For ages (now - last_checked_at) the percentile direction inverts: # the 95th percentile of *age* corresponds to the 5th percentile of the # *timestamp* (the oldest 5% of last_checked_at values). Computing the # percentile directly on now()-last_checked_at would be cleaner but # PostgreSQL's PERCENTILE_DISC needs an ordered set on a column, so we # invert the percentile argument instead. result = connection.exec_query( <<~SQL.squish, SELECT COUNT(*)::int AS active_partitions, COUNT(*) FILTER (WHERE p.last_checked_at IS NULL)::int AS never_checked, COUNT(*) FILTER (WHERE p.next_eligible_at IS NOT NULL AND p.next_eligible_at > now())::int AS in_backoff, EXTRACT(EPOCH FROM (now() - MIN(p.last_checked_at)))::float AS oldest_age_seconds, EXTRACT(EPOCH FROM (now() - PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY p.last_checked_at)))::float AS p50_age_seconds, EXTRACT(EPOCH FROM (now() - PERCENTILE_DISC(0.05) WITHIN GROUP (ORDER BY p.last_checked_at)))::float AS p95_age_seconds FROM #{PARTITIONS_TABLE} p #{filter_sql} SQL "partition_round_trip_stats", params ) row = result.first || {} { active_partitions: row["active_partitions"].to_i, never_checked: row["never_checked"].to_i, in_backoff: row["in_backoff"].to_i, oldest_age_seconds: row["oldest_age_seconds"]&.to_f, p50_age_seconds: row["p50_age_seconds"]&.to_f, p95_age_seconds: row["p95_age_seconds"]&.to_f } end |
.partition_round_trip_stats_by_policy ⇒ Object
Per-policy round-trip stats in one grouped query, keyed by policy_name. Only the fields the dashboard overview renders (in_backoff, oldest/p95 age); use partition_round_trip_stats for the full single-policy breakdown. Replaces N per-policy calls on the dashboard. Same percentile-inversion note as partition_round_trip_stats.
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 |
# File 'lib/dispatch_policy/repository.rb', line 715 def partition_round_trip_stats_by_policy result = connection.exec_query( <<~SQL.squish, SELECT p.policy_name, COUNT(*) FILTER (WHERE p.next_eligible_at IS NOT NULL AND p.next_eligible_at > now())::int AS in_backoff, EXTRACT(EPOCH FROM (now() - MIN(p.last_checked_at)))::float AS oldest_age_seconds, EXTRACT(EPOCH FROM (now() - PERCENTILE_DISC(0.05) WITHIN GROUP (ORDER BY p.last_checked_at)))::float AS p95_age_seconds FROM #{PARTITIONS_TABLE} p WHERE p.status = 'active' AND p.pending_count > 0 GROUP BY p.policy_name SQL "partition_round_trip_stats_by_policy", [] ) result.to_a.each_with_object({}) do |r, h| h[r["policy_name"]] = { in_backoff: r["in_backoff"].to_i, oldest_age_seconds: r["oldest_age_seconds"]&.to_f, p95_age_seconds: r["p95_age_seconds"]&.to_f } end end |
.record_partition_admit!(policy_name:, partition_key:, admitted:, gate_state_patch:, retry_after:, half_life_seconds: nil) ⇒ Object
Per-partition admit-state UPDATE. Runs inside the per-partition admission TX alongside the DELETE, so pending_count / total_admitted / gate_state changes commit atomically with the claim and the adapter handoff. For the deny case use ‘bulk_record_partition_denies!`.
When ‘half_life_seconds` is non-nil, the row’s EWMA decayed_admits counter is also refreshed in the same UPDATE: previous value decays exponentially based on the elapsed wall time since the last update, then ‘admitted` is added on top. This keeps fairness state atomic with the admit (no separate write, no race) and leaves the partitions row’s lock undisturbed.
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/dispatch_policy/repository.rb', line 245 def record_partition_admit!(policy_name:, partition_key:, admitted:, gate_state_patch:, retry_after:, half_life_seconds: nil) next_eligible_sql, next_eligible_params = next_eligible_clause(retry_after) gate_state_json = JSON.dump(gate_state_patch || {}) params = [policy_name, partition_key, admitted, gate_state_json, *next_eligible_params] if half_life_seconds && half_life_seconds.to_f.positive? # decay constant τ such that exp(-Δt/τ) halves every half_life: # τ = half_life / ln(2). NULLIF guards a degenerate τ=0. # # The GREATEST(..., -700) clamp keeps `exp()` from raising # `value out of range: underflow` when a partition has been # idle for many half-lives. Postgres throws around # `exp(-746)` on double precision; -700 still yields a finite # ~9.86e-305, which is effectively zero for the EWMA. Without # the clamp, a partition idle long enough for Δt/τ to exceed # ~746 breaks every subsequent admission UPDATE on it: Tick # rolls back the whole TX, the staged rows return, and the # partition never drains. decay_idx = params.size + 1 admitted_idx_for_ewma = 3 decay_tau = half_life_seconds.to_f / Math.log(2) params << decay_tau decay_sql = <<~SQL.squish decayed_admits = decayed_admits * exp(GREATEST( - COALESCE(EXTRACT(EPOCH FROM (now() - decayed_admits_at)), 0) / NULLIF($#{decay_idx}::double precision, 0), -700 )) + $#{admitted_idx_for_ewma}, decayed_admits_at = now(), SQL else decay_sql = "" end connection.exec_query( <<~SQL.squish, UPDATE #{PARTITIONS_TABLE} SET pending_count = GREATEST(pending_count - $3, 0), total_admitted = total_admitted + $3, last_admit_at = CASE WHEN $3 > 0 THEN now() ELSE last_admit_at END, gate_state = gate_state || $4::jsonb, next_eligible_at = #{next_eligible_sql}, #{decay_sql} updated_at = now() WHERE policy_name = $1 AND partition_key = $2 SQL "record_partition_admit", params ) end |
.record_tick_sample!(policy_name:, duration_ms:, partitions_seen:, partitions_admitted:, partitions_denied:, jobs_admitted:, forward_failures:, pending_total:, inflight_total:, denied_reasons:) ⇒ Object
Records one row per Tick.run with admission and timing aggregates so the operator UI can display rates over time without sampling on the read path.
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 |
# File 'lib/dispatch_policy/repository.rb', line 450 def record_tick_sample!(policy_name:, duration_ms:, partitions_seen:, partitions_admitted:, partitions_denied:, jobs_admitted:, forward_failures:, pending_total:, inflight_total:, denied_reasons:) connection.exec_query( <<~SQL.squish, INSERT INTO #{SAMPLES_TABLE} (policy_name, sampled_at, duration_ms, partitions_seen, partitions_admitted, partitions_denied, jobs_admitted, forward_failures, pending_total, inflight_total, denied_reasons) VALUES ($1, now(), $2, $3, $4, $5, $6, $7, $8, $9, $10::jsonb) SQL "record_tick_sample", [policy_name, duration_ms.to_i, partitions_seen.to_i, partitions_admitted.to_i, partitions_denied.to_i, jobs_admitted.to_i, forward_failures.to_i, pending_total.to_i, inflight_total.to_i, JSON.dump(denied_reasons || {})] ) end |
.sample_filter(policy_name, since) ⇒ Object
868 869 870 871 872 873 874 875 876 |
# File 'lib/dispatch_policy/repository.rb', line 868 def sample_filter(policy_name, since) params = [since] if policy_name params << policy_name ["WHERE sampled_at >= $1 AND policy_name = $2", params] else ["WHERE sampled_at >= $1", params] end end |
.stage!(policy_name:, partition_key:, queue_name:, job_class:, job_data:, context:, shard: Policy::DEFAULT_SHARD, scheduled_at: nil, priority: 0) ⇒ Object
Insert one staged_job row + UPSERT its partition. The partition’s ‘context` is refreshed on every call so admission-time gates always see the latest dynamic config.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/dispatch_policy/repository.rb', line 54 def stage!(policy_name:, partition_key:, queue_name:, job_class:, job_data:, context:, shard: Policy::DEFAULT_SHARD, scheduled_at: nil, priority: 0) connection.transaction(requires_new: true) do connection.exec_query( <<~SQL.squish, INSERT INTO #{STAGED_TABLE} (policy_name, partition_key, queue_name, job_class, job_data, context, scheduled_at, priority, enqueued_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb, $7, $8, now()) SQL "stage_job", [policy_name, partition_key, queue_name, job_class, JSON.dump(job_data), JSON.dump(context), scheduled_at, priority] ) upsert_partition!( policy_name: policy_name, partition_key: partition_key, queue_name: queue_name, shard: shard, context: context, delta_pending: 1 ) end true end |
.stage_many!(rows) ⇒ Object
Bulk version for perform_all_later. Receives an array of hashes with the same keys as #stage!. Performs one INSERT for staged_jobs and one UPSERT per (policy_name, partition_key) group.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/dispatch_policy/repository.rb', line 81 def stage_many!(rows) return 0 if rows.empty? connection.transaction(requires_new: true) do values_sql = [] params = [] rows.each_with_index do |row, idx| base = idx * 8 values_sql << "($#{base + 1}, $#{base + 2}, $#{base + 3}, $#{base + 4}, $#{base + 5}::jsonb, $#{base + 6}::jsonb, $#{base + 7}, $#{base + 8})" params.push( row[:policy_name], row[:partition_key], row[:queue_name], row[:job_class], JSON.dump(row[:job_data]), JSON.dump(row[:context] || {}), row[:scheduled_at], row[:priority] || 0 ) end connection.exec_query( <<~SQL.squish, INSERT INTO #{STAGED_TABLE} (policy_name, partition_key, queue_name, job_class, job_data, context, scheduled_at, priority) VALUES #{values_sql.join(", ")} SQL "stage_many", params ) rows.group_by { |r| [r[:policy_name], r[:partition_key]] }.each do |(policy_name, partition_key), group| upsert_partition!( policy_name: policy_name, partition_key: partition_key, queue_name: group.first[:queue_name], shard: group.first[:shard] || Policy::DEFAULT_SHARD, context: group.last[:context] || {}, delta_pending: group.size ) end end rows.size end |
.sweep_inactive_partitions!(cutoff_seconds:) ⇒ Object
820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 |
# File 'lib/dispatch_policy/repository.rb', line 820 def sweep_inactive_partitions!(cutoff_seconds:) connection.exec_query( <<~SQL.squish, DELETE FROM #{PARTITIONS_TABLE} WHERE pending_count = 0 AND status = 'active' AND ( (last_admit_at IS NOT NULL AND last_admit_at < now() - ($1 || ' seconds')::interval) OR (last_admit_at IS NULL AND created_at < now() - ($1 || ' seconds')::interval) ) SQL "sweep_inactive_partitions", [cutoff_seconds.to_i] ) end |
.sweep_old_tick_samples!(cutoff_seconds:) ⇒ Object
—– tick samples sweep ————————————————-
810 811 812 813 814 815 816 |
# File 'lib/dispatch_policy/repository.rb', line 810 def sweep_old_tick_samples!(cutoff_seconds:) connection.exec_query( "DELETE FROM #{SAMPLES_TABLE} WHERE sampled_at < now() - ($1 || ' seconds')::interval", "sweep_old_tick_samples", [cutoff_seconds.to_i] ) end |
.sweep_stale_inflight!(cutoff_seconds:, queued_cutoff_seconds: nil) ⇒ Object
Reap inflight rows whose owner is gone. Two tiers, distinguished by whether the row was ever heartbeated past its admission:
heartbeat_at > admitted_at → the worker started performing and the
heartbeat thread advanced heartbeat_at at least once. If it then
went silent for `cutoff_seconds`, the worker died mid-run: reap.
heartbeat_at <= admitted_at → never heartbeated past admission. The
row was pre-inserted by the Tick and the job is still waiting in
the adapter's queue (or only just started — the first heartbeat
fires after inflight_heartbeat_interval). Reaping these at the
short cutoff would under-count the concurrency gate and over-admit
whenever queue latency exceeds it. Only reap once they're older
than the far more generous `queued_cutoff_seconds`, by which point
the admission is presumed lost.
The Tick pre-insert writes admitted_at and heartbeat_at from the same now() (a single statement), so a never-started row has them exactly equal; one heartbeat makes heartbeat_at strictly greater.
425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
# File 'lib/dispatch_policy/repository.rb', line 425 def sweep_stale_inflight!(cutoff_seconds:, queued_cutoff_seconds: nil) queued_cutoff_seconds ||= cutoff_seconds connection.exec_query( <<~SQL.squish, DELETE FROM #{INFLIGHT_TABLE} WHERE (heartbeat_at > admitted_at AND heartbeat_at < now() - ($1 || ' seconds')::interval) OR (heartbeat_at <= admitted_at AND admitted_at < now() - ($2 || ' seconds')::interval) SQL "sweep_stale_inflight", [cutoff_seconds.to_i, queued_cutoff_seconds.to_i] ) end |
.tick_samples_buckets(policy_name: nil, since:, bucket_seconds: 60) ⇒ Object
Returns time-bucketed series for sparklines. ‘bucket_seconds` is the bucket width. Each row: { bucket_at:, jobs_admitted:, forward_failures:, pending_total:, ticks: }.
‘pending_total` is the AVERAGE pending observed across the ticks in that bucket — using AVG (not MAX/last) gives a smoother trend that’s resilient to a single outlier sample dragging the bucket up.
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 |
# File 'lib/dispatch_policy/repository.rb', line 585 def tick_samples_buckets(policy_name: nil, since:, bucket_seconds: 60) where_sql, params = sample_filter(policy_name, since) bucket_param_idx = params.size + 1 params << bucket_seconds.to_i # `date_bin` requires Postgres 14+. We compute the bucket via floor on # the epoch instead so the gem also runs on Postgres 12/13. result = connection.exec_query( <<~SQL.squish, SELECT to_timestamp(floor(extract(epoch from sampled_at) / $#{bucket_param_idx})::bigint * $#{bucket_param_idx}) AS bucket_at, COALESCE(SUM(jobs_admitted), 0)::int AS jobs_admitted, COALESCE(SUM(forward_failures), 0)::int AS forward_failures, COALESCE(AVG(pending_total), 0)::int AS pending_total, COUNT(*)::int AS ticks FROM #{SAMPLES_TABLE} #{where_sql} GROUP BY bucket_at ORDER BY bucket_at ASC SQL "tick_samples_buckets", params ) result.to_a.map do |r| { bucket_at: r["bucket_at"], jobs_admitted: r["jobs_admitted"].to_i, forward_failures: r["forward_failures"].to_i, pending_total: r["pending_total"].to_i, ticks: r["ticks"].to_i } end end |
.tick_summaries_by_policy(since:) ⇒ Object
One grouped query returning per-policy tick aggregates, keyed by policy_name. Replaces calling tick_summary once per policy on the dashboard (N queries → 1). Only the fields the overview renders.
{ "policy_a" => { jobs_admitted:, forward_failures:, ticks:,
avg_duration_ms: }, ... }
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 |
# File 'lib/dispatch_policy/repository.rb', line 509 def tick_summaries_by_policy(since:) result = connection.exec_query( <<~SQL.squish, SELECT policy_name, COALESCE(SUM(jobs_admitted), 0)::int AS jobs_admitted, COALESCE(SUM(forward_failures), 0)::int AS forward_failures, COUNT(*)::int AS ticks, COALESCE(AVG(duration_ms), 0)::int AS avg_duration_ms FROM #{SAMPLES_TABLE} WHERE sampled_at >= $1 GROUP BY policy_name SQL "tick_summaries_by_policy", [since] ) result.to_a.each_with_object({}) do |r, h| h[r["policy_name"]] = { jobs_admitted: r["jobs_admitted"].to_i, forward_failures: r["forward_failures"].to_i, ticks: r["ticks"].to_i, avg_duration_ms: r["avg_duration_ms"].to_i } end end |
.tick_summary(policy_name: nil, since:) ⇒ Object
Aggregate counters since ‘since` (a Time). If `policy_name` is nil, aggregates across all policies. Returns a Hash with summary keys.
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'lib/dispatch_policy/repository.rb', line 470 def tick_summary(policy_name: nil, since:) where_sql, params = sample_filter(policy_name, since) result = connection.exec_query( <<~SQL.squish, SELECT COALESCE(SUM(jobs_admitted), 0)::int AS jobs_admitted, COALESCE(SUM(partitions_seen), 0)::int AS partitions_seen, COALESCE(SUM(partitions_admitted), 0)::int AS partitions_admitted, COALESCE(SUM(partitions_denied), 0)::int AS partitions_denied, COALESCE(SUM(forward_failures), 0)::int AS forward_failures, COUNT(*)::int AS ticks, COALESCE(AVG(duration_ms), 0)::int AS avg_duration_ms, COALESCE(MAX(duration_ms), 0)::int AS max_duration_ms, MAX(sampled_at) AS last_sampled_at FROM #{SAMPLES_TABLE} #{where_sql} SQL "tick_summary", params ) row = result.first || {} { jobs_admitted: row["jobs_admitted"].to_i, partitions_seen: row["partitions_seen"].to_i, partitions_admitted: row["partitions_admitted"].to_i, partitions_denied: row["partitions_denied"].to_i, forward_failures: row["forward_failures"].to_i, ticks: row["ticks"].to_i, avg_duration_ms: row["avg_duration_ms"].to_i, max_duration_ms: row["max_duration_ms"].to_i, last_sampled_at: row["last_sampled_at"] } end |
.top_denied_reason_by_policy(since:) ⇒ Object
The single most-denied reason per policy in one query, keyed by policy_name → [reason, count]. Replaces calling denied_reasons_summary per policy on the dashboard just to read its top entry.
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 |
# File 'lib/dispatch_policy/repository.rb', line 557 def top_denied_reason_by_policy(since:) result = connection.exec_query( <<~SQL.squish, SELECT DISTINCT ON (policy_name) policy_name, key, total FROM ( SELECT policy_name, key, SUM(value::int)::int AS total FROM #{SAMPLES_TABLE}, LATERAL jsonb_each_text(denied_reasons) WHERE sampled_at >= $1 GROUP BY policy_name, key ) t ORDER BY policy_name, total DESC SQL "top_denied_reason_by_policy", [since] ) result.to_a.each_with_object({}) do |r, h| h[r["policy_name"]] = [r["key"], r["total"].to_i] end end |
.trend_direction(values, threshold_ratio: 0.10) ⇒ Object
Direction of a numeric series. Compares the average of the first third to the last third — robust to noise on the ends.
619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 |
# File 'lib/dispatch_policy/repository.rb', line 619 def self.trend_direction(values, threshold_ratio: 0.10) return :flat if values.size < 3 n = values.size head = values.first(n / 3) tail = values.last(n / 3) head_avg = head.sum.to_f / head.size tail_avg = tail.sum.to_f / tail.size return :flat if head_avg.zero? && tail_avg.zero? delta_ratio = (tail_avg - head_avg) / [head_avg, 1.0].max if delta_ratio >= threshold_ratio :up elsif delta_ratio <= -threshold_ratio :down else :flat end end |
.upsert_partition!(policy_name:, partition_key:, queue_name:, context:, delta_pending:, shard: Policy::DEFAULT_SHARD) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/dispatch_policy/repository.rb', line 125 def upsert_partition!(policy_name:, partition_key:, queue_name:, context:, delta_pending:, shard: Policy::DEFAULT_SHARD) connection.exec_query( <<~SQL.squish, INSERT INTO #{PARTITIONS_TABLE} (policy_name, partition_key, queue_name, shard, context, context_updated_at, pending_count, last_enqueued_at, status, gate_state, created_at, updated_at) VALUES ($1, $2, $3, $4, $5::jsonb, now(), $6, now(), 'active', '{}'::jsonb, now(), now()) ON CONFLICT (policy_name, partition_key) DO UPDATE SET context = EXCLUDED.context, context_updated_at = EXCLUDED.context_updated_at, queue_name = COALESCE(EXCLUDED.queue_name, #{PARTITIONS_TABLE}.queue_name), shard = #{PARTITIONS_TABLE}.shard, pending_count = #{PARTITIONS_TABLE}.pending_count + EXCLUDED.pending_count, last_enqueued_at = EXCLUDED.last_enqueued_at, updated_at = now() SQL "upsert_partition", [policy_name, partition_key, queue_name, shard, JSON.dump(context), delta_pending] ) end |
.with_connection ⇒ Object
Wraps ‘block` in `connected_to(role: …)` when DispatchPolicy.config .database_role is set. Used by Tick to ensure the admission TX is opened against the same DB role that good_job / solid_queue uses, critical for multi-DB Rails setups (e.g. solid_queue on a separate `:queue` DB) where atomicity only holds when the staging TX and the adapter INSERT share a connection.
31 32 33 34 35 36 37 38 |
# File 'lib/dispatch_policy/repository.rb', line 31 def with_connection role = DispatchPolicy.config.database_role if role && ActiveRecord::Base.respond_to?(:connected_to) ActiveRecord::Base.connected_to(role: role) { yield } else yield end end |