Module: DispatchPolicy::OperatorHints
- Defined in:
- lib/dispatch_policy/operator_hints.rb
Overview
Pure-Ruby hint generator. Takes a snapshot of live metrics and returns a list of message: that the dashboard renders.
Each predicate is intentionally conservative: hints fire on crossings the operator can fix from the UI or by toggling a config value, not on noise. Levels:
:info — observation worth glancing at
:warn — attention soon
:critical — fix now
Defined Under Namespace
Classes: Hint
Class Method Summary collapse
-
.for(metrics) ⇒ Object
‘metrics` is a hash of: tick_max_duration_ms: int (config tick_max_duration × 1000) avg_tick_ms: int max_tick_ms: int pending_total: int admitted_per_minute: int (last 1m) forward_failures: int (last 1m) jobs_admitted: int (last 1m, denominator for fail %) active_partitions: int never_checked: int in_backoff: int total_partitions: int adapter_target_jps: int|nil (config.adapter_throughput_target).
Class Method Details
.for(metrics) ⇒ Object
‘metrics` is a hash of:
tick_max_duration_ms: int (config tick_max_duration × 1000)
avg_tick_ms: int
max_tick_ms: int
pending_total: int
admitted_per_minute: int (last 1m)
forward_failures: int (last 1m)
jobs_admitted: int (last 1m, denominator for fail %)
active_partitions: int
never_checked: int
in_backoff: int
total_partitions: int
adapter_target_jps: int|nil (config.adapter_throughput_target)
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/dispatch_policy/operator_hints.rb', line 32 def for(metrics) hints = [] m = metrics # ---- tick approaching deadline --------------------------------- if m[:tick_max_duration_ms].to_i.positive? && m[:avg_tick_ms].to_i.positive? ratio = m[:avg_tick_ms].to_f / m[:tick_max_duration_ms] if ratio >= 0.6 hints << Hint.new( level: ratio >= 0.85 ? :critical : :warn, message: "Avg tick is #{format('%.0f%%', ratio * 100)} of tick_max_duration. " \ "Lower admission_batch_size, set tick_admission_budget, or shard the policy." ) end end # ---- backlog drain time ---------------------------------------- if m[:admitted_per_minute].to_i.positive? && m[:pending_total].to_i.positive? drain_minutes = m[:pending_total].to_f / m[:admitted_per_minute] if drain_minutes >= 30 level = drain_minutes >= 120 ? :warn : :info hints << Hint.new( level: level, message: "At #{m[:admitted_per_minute]} admits/min, the current backlog of " \ "#{m[:pending_total]} pending would take ~#{drain_minutes.round} min " \ "to drain. Raise admission_batch_size, raise the gate's rate, or shard." ) end end # ---- pending growing while admit rate is non-trivial ----------- # `pending_trend` compares head/tail thirds of the sparkline; a # transient spike that already drained still leaves the tail # average elevated. Gate on current pending > 0 so a recovered # backlog does not raise a warning. if m[:pending_trend] == :up && m[:admitted_per_minute].to_i.positive? && m[:pending_total].to_i.positive? hints << Hint.new( level: :warn, message: "Pending is growing while we are admitting. Inflow > outflow — " \ "either the throttle rate is below the producer rate, or the worker pool can't keep up." ) end # ---- forward failure rate -------------------------------------- if m[:jobs_admitted].to_i.positive? rate = m[:forward_failures].to_f / m[:jobs_admitted] if rate >= 0.01 hints << Hint.new( level: rate >= 0.05 ? :critical : :warn, message: "Forward failures at #{format('%.1f%%', rate * 100)} (#{m[:forward_failures]} / " \ "#{m[:jobs_admitted]} admits). Inspect logs — adapter is rejecting enqueues." ) end end # ---- never_checked > 0 with cardinality > batch ---------------- if m[:never_checked].to_i.positive? hints << Hint.new( level: :warn, message: "#{m[:never_checked]} active partitions have never been checked. " \ "The tick is not getting through them — increase partition_batch_size or shard." ) end # ---- partition cardinality ------------------------------------- if m[:total_partitions].to_i >= 50_000 hints << Hint.new( level: :info, message: "#{m[:total_partitions]} partitions in DB. claim_partitions starts to feel " \ "this around 50k–100k. Consider lowering partition_inactive_after to GC " \ "drained ones earlier." ) end # ---- adapter ceiling proximity -------------------------------- target_jps = m[:adapter_target_jps].to_i if target_jps.positive? && m[:admitted_per_minute].to_i.positive? current_jps = m[:admitted_per_minute] / 60.0 ratio = current_jps / target_jps if ratio >= 0.7 hints << Hint.new( level: ratio >= 0.95 ? :critical : :warn, message: "Admitting #{format('%.0f', current_jps)} jobs/sec, " \ "#{format('%.0f%%', ratio * 100)} of the configured adapter ceiling " \ "(#{target_jps}/sec). Consider an additional shard before the next traffic spike." ) end end hints end |