Module: DispatchPolicy::OperatorHints

Defined in:
lib/dispatch_policy/operator_hints.rb

Overview

Pure-Ruby hint generator. Takes a snapshot of live metrics and returns a list of message: that the dashboard renders.

Each predicate is intentionally conservative: hints fire on crossings the operator can fix from the UI or by toggling a config value, not on noise. Levels:

:info     — observation worth glancing at
:warn     — attention soon
:critical — fix now

Defined Under Namespace

Classes: Hint

Class Method Summary collapse

Class Method Details

.for(metrics) ⇒ Object

‘metrics` is a hash of:

tick_max_duration_ms:   int  (config tick_max_duration × 1000)
avg_tick_ms:            int
max_tick_ms:            int
pending_total:          int
admitted_per_minute:    int  (last 1m)
forward_failures:       int  (last 1m)
jobs_admitted:          int  (last 1m, denominator for fail %)
active_partitions:      int
never_checked:          int
in_backoff:             int
total_partitions:       int
adapter_target_jps:     int|nil  (config.adapter_throughput_target)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/dispatch_policy/operator_hints.rb', line 32

def for(metrics)
  hints = []
  m     = metrics

  # ---- tick approaching deadline ---------------------------------
  if m[:tick_max_duration_ms].to_i.positive? && m[:avg_tick_ms].to_i.positive?
    ratio = m[:avg_tick_ms].to_f / m[:tick_max_duration_ms]
    if ratio >= 0.6
      hints << Hint.new(
        level: ratio >= 0.85 ? :critical : :warn,
        message: "Avg tick is #{format('%.0f%%', ratio * 100)} of tick_max_duration. " \
                 "Lower admission_batch_size, set tick_admission_budget, or shard the policy."
      )
    end
  end

  # ---- backlog drain time ----------------------------------------
  if m[:admitted_per_minute].to_i.positive? && m[:pending_total].to_i.positive?
    drain_minutes = m[:pending_total].to_f / m[:admitted_per_minute]
    if drain_minutes >= 30
      level = drain_minutes >= 120 ? :warn : :info
      hints << Hint.new(
        level: level,
        message: "At #{m[:admitted_per_minute]} admits/min, the current backlog of " \
                 "#{m[:pending_total]} pending would take ~#{drain_minutes.round} min " \
                 "to drain. Raise admission_batch_size, raise the gate's rate, or shard."
      )
    end
  end

  # ---- pending growing while admit rate is non-trivial -----------
  # `pending_trend` compares head/tail thirds of the sparkline; a
  # transient spike that already drained still leaves the tail
  # average elevated. Gate on current pending > 0 so a recovered
  # backlog does not raise a warning.
  if m[:pending_trend] == :up &&
     m[:admitted_per_minute].to_i.positive? &&
     m[:pending_total].to_i.positive?
    hints << Hint.new(
      level: :warn,
      message: "Pending is growing while we are admitting. Inflow > outflow — " \
               "either the throttle rate is below the producer rate, or the worker pool can't keep up."
    )
  end

  # ---- forward failure rate --------------------------------------
  if m[:jobs_admitted].to_i.positive?
    rate = m[:forward_failures].to_f / m[:jobs_admitted]
    if rate >= 0.01
      hints << Hint.new(
        level: rate >= 0.05 ? :critical : :warn,
        message: "Forward failures at #{format('%.1f%%', rate * 100)} (#{m[:forward_failures]} / " \
                 "#{m[:jobs_admitted]} admits). Inspect logs — adapter is rejecting enqueues."
      )
    end
  end

  # ---- never_checked > 0 with cardinality > batch ----------------
  if m[:never_checked].to_i.positive?
    hints << Hint.new(
      level: :warn,
      message: "#{m[:never_checked]} active partitions have never been checked. " \
               "The tick is not getting through them — increase partition_batch_size or shard."
    )
  end

  # ---- partition cardinality -------------------------------------
  if m[:total_partitions].to_i >= 50_000
    hints << Hint.new(
      level: :info,
      message: "#{m[:total_partitions]} partitions in DB. claim_partitions starts to feel " \
               "this around 50k–100k. Consider lowering partition_inactive_after to GC " \
               "drained ones earlier."
    )
  end

  # ---- adapter ceiling proximity --------------------------------
  target_jps = m[:adapter_target_jps].to_i
  if target_jps.positive? && m[:admitted_per_minute].to_i.positive?
    current_jps = m[:admitted_per_minute] / 60.0
    ratio = current_jps / target_jps
    if ratio >= 0.7
      hints << Hint.new(
        level: ratio >= 0.95 ? :critical : :warn,
        message: "Admitting #{format('%.0f', current_jps)} jobs/sec, " \
                 "#{format('%.0f%%', ratio * 100)} of the configured adapter ceiling " \
                 "(#{target_jps}/sec). Consider an additional shard before the next traffic spike."
      )
    end
  end

  hints
end