Module: DispatchPolicy::OperatorHints

Defined in:
lib/dispatch_policy/operator_hints.rb

Overview

Pure-Ruby hint generator. Takes a snapshot of live metrics and returns a list of message: that the dashboard renders.

Each predicate is intentionally conservative: hints fire on crossings the operator can fix from the UI or by toggling a config value, not on noise. Levels:

:info     — observation worth glancing at
:warn     — attention soon
:critical — fix now

Defined Under Namespace

Classes: Hint

Class Method Summary collapse

Class Method Details

.for(metrics) ⇒ Object

‘metrics` is a hash of:

tick_max_duration_ms:   int  (config tick_max_duration × 1000)
avg_tick_ms:            int
max_tick_ms:            int
pending_total:          int
admitted_per_minute:    int  (last 1m)
forward_failures:       int  (last 1m)
jobs_admitted:          int  (last 1m, denominator for fail %)
active_partitions:      int
never_checked:          int
in_backoff:             int
total_partitions:       int
adapter_target_jps:     int|nil  (config.adapter_throughput_target)
paused:                 bool (policy-level pause flag)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/dispatch_policy/operator_hints.rb', line 33

def for(metrics)
  hints = []
  m     = metrics

  # ---- policy paused: everything below presumes admission SHOULD be
  # flowing (never_checked, drain time, pending growing), so during a
  # deliberate pause those hints turn into false alarms — e.g.
  # "increase partition_batch_size" while the tick is intentionally
  # skipping the policy. State the pause and stop.
  if m[:paused]
    return [Hint.new(
      level: :warn,
      message: "Policy is paused — admission is stopped while staging continues " \
               "(pending keeps growing). Resume to drain."
    )]
  end

  # ---- tick approaching deadline ---------------------------------
  if m[:tick_max_duration_ms].to_i.positive? && m[:avg_tick_ms].to_i.positive?
    ratio = m[:avg_tick_ms].to_f / m[:tick_max_duration_ms]
    if ratio >= 0.6
      hints << Hint.new(
        level: ratio >= 0.85 ? :critical : :warn,
        message: "Avg tick is #{format('%.0f%%', ratio * 100)} of tick_max_duration. " \
                 "Lower admission_batch_size, set tick_admission_budget, or shard the policy."
      )
    end
  end

  # ---- backlog drain time ----------------------------------------
  if m[:admitted_per_minute].to_i.positive? && m[:pending_total].to_i.positive?
    drain_minutes = m[:pending_total].to_f / m[:admitted_per_minute]
    if drain_minutes >= 30
      level = drain_minutes >= 120 ? :warn : :info
      hints << Hint.new(
        level: level,
        message: "At #{m[:admitted_per_minute]} admits/min, the current backlog of " \
                 "#{m[:pending_total]} pending would take ~#{drain_minutes.round} min " \
                 "to drain. Raise admission_batch_size, raise the gate's rate, or shard."
      )
    end
  end

  # ---- pending growing while admit rate is non-trivial -----------
  # `pending_trend` compares head/tail thirds of the sparkline; a
  # transient spike that already drained still leaves the tail
  # average elevated. Gate on current pending > 0 so a recovered
  # backlog does not raise a warning.
  if m[:pending_trend] == :up &&
     m[:admitted_per_minute].to_i.positive? &&
     m[:pending_total].to_i.positive?
    hints << Hint.new(
      level: :warn,
      message: "Pending is growing while we are admitting. Inflow > outflow — " \
               "either the throttle rate is below the producer rate, or the worker pool can't keep up."
    )
  end

  # ---- forward failure rate --------------------------------------
  if m[:jobs_admitted].to_i.positive?
    rate = m[:forward_failures].to_f / m[:jobs_admitted]
    if rate >= 0.01
      hints << Hint.new(
        level: rate >= 0.05 ? :critical : :warn,
        message: "Forward failures at #{format('%.1f%%', rate * 100)} (#{m[:forward_failures]} / " \
                 "#{m[:jobs_admitted]} admits). Inspect logs — adapter is rejecting enqueues."
      )
    end
  end

  # ---- never_checked > 0 with cardinality > batch ----------------
  if m[:never_checked].to_i.positive?
    hints << Hint.new(
      level: :warn,
      message: "#{m[:never_checked]} active partitions have never been checked. " \
               "The tick is not getting through them — increase partition_batch_size or shard."
    )
  end

  # ---- partition cardinality -------------------------------------
  if m[:total_partitions].to_i >= 50_000
    hints << Hint.new(
      level: :info,
      message: "#{m[:total_partitions]} partitions in DB. claim_partitions starts to feel " \
               "this around 50k–100k. Consider lowering partition_inactive_after to GC " \
               "drained ones earlier."
    )
  end

  # ---- adapter ceiling proximity --------------------------------
  target_jps = m[:adapter_target_jps].to_i
  if target_jps.positive? && m[:admitted_per_minute].to_i.positive?
    current_jps = m[:admitted_per_minute] / 60.0
    ratio = current_jps / target_jps
    if ratio >= 0.7
      hints << Hint.new(
        level: ratio >= 0.95 ? :critical : :warn,
        message: "Admitting #{format('%.0f', current_jps)} jobs/sec, " \
                 "#{format('%.0f%%', ratio * 100)} of the configured adapter ceiling " \
                 "(#{target_jps}/sec). Consider an additional shard before the next traffic spike."
      )
    end
  end

  hints
end