Class: Pgbus::JobStat

Inherits:
BusRecord
  • Object
show all
Defined in:
app/models/pgbus/job_stat.rb

Class Method Summary collapse

Class Method Details

.avg_duration_by_class(minutes: 60) ⇒ Object

Average duration by job class



72
73
74
75
76
77
# File 'app/models/pgbus/job_stat.rb', line 72

def self.avg_duration_by_class(minutes: 60)
  since(minutes.minutes.ago)
    .group(:job_class)
    .order(Arel.sql("AVG(duration_ms) DESC"))
    .average(:duration_ms)
end

.avg_latency_by_queue(minutes: 60) ⇒ Object

Average latency by queue



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'app/models/pgbus/job_stat.rb', line 158

def self.avg_latency_by_queue(minutes: 60)
  return [] unless latency_columns?

  since(minutes.minutes.ago)
    .where.not(enqueue_latency_ms: nil)
    .group(:queue_name)
    .order(Arel.sql("AVG(enqueue_latency_ms) DESC"))
    .pluck(
      :queue_name,
      Arel.sql("ROUND(AVG(enqueue_latency_ms))"),
      Arel.sql("ROUND(PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY enqueue_latency_ms))"),
      Arel.sql("COUNT(*)")
    )
    .map { |q, avg, p95, count| { queue_name: q, avg_ms: avg.to_i, p95_ms: p95.to_i, count: count.to_i } }
end

.cleanup!(older_than:) ⇒ Object

Cleanup old stats



175
176
177
# File 'app/models/pgbus/job_stat.rb', line 175

def self.cleanup!(older_than:)
  where("created_at < ?", older_than).delete_all
end

.latency_columns?Boolean

Memoized — checks if the latency migration has been applied. Same transient-error handling as ‘table_exists?`: a failed probe is not cached, so a later successful probe can still enable latency recording.

Returns:

  • (Boolean)


53
54
55
56
57
58
59
60
61
# File 'app/models/pgbus/job_stat.rb', line 53

def self.latency_columns?
  return @latency_columns if defined?(@latency_columns)
  return false unless table_exists?

  @latency_columns = column_names.include?("enqueue_latency_ms")
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus] Failed to check job stat latency columns: #{e.message}" }
  false
end

.latency_trend(minutes: 60) ⇒ Object

Latency trend: average enqueue latency per minute bucketed



142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'app/models/pgbus/job_stat.rb', line 142

def self.latency_trend(minutes: 60)
  return [] unless latency_columns?

  since(minutes.minutes.ago)
    .where.not(enqueue_latency_ms: nil)
    .group("date_trunc('minute', created_at)")
    .order(Arel.sql("date_trunc('minute', created_at)"))
    .pluck(
      Arel.sql("date_trunc('minute', created_at)"),
      Arel.sql("ROUND(AVG(enqueue_latency_ms))"),
      Arel.sql("ROUND(PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY enqueue_latency_ms))")
    )
    .map { |time, avg, p95| { time: time, avg_ms: avg.to_i, p95_ms: p95.to_i } }
end

.record!(job_class:, queue_name:, status:, duration_ms:, enqueue_latency_ms: nil, retry_count: 0) ⇒ Object

Record a job execution stat. Called by the executor after each job.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'app/models/pgbus/job_stat.rb', line 13

def self.record!(job_class:, queue_name:, status:, duration_ms:, enqueue_latency_ms: nil, retry_count: 0)
  return unless table_exists?

  attrs = {
    job_class: job_class,
    queue_name: queue_name,
    status: status,
    duration_ms: duration_ms
  }
  attrs[:enqueue_latency_ms] = enqueue_latency_ms if latency_columns?
  attrs[:retry_count] = retry_count if latency_columns?

  create!(attrs)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus] Failed to record job stat: #{e.message}" }
end

.slowest_classes(limit: 10, minutes: 60) ⇒ Object

Top N slowest job classes by average duration



85
86
87
88
89
90
91
92
# File 'app/models/pgbus/job_stat.rb', line 85

def self.slowest_classes(limit: 10, minutes: 60)
  since(minutes.minutes.ago)
    .group(:job_class)
    .order(Arel.sql("AVG(duration_ms) DESC"))
    .limit(limit)
    .pluck(:job_class, Arel.sql("COUNT(*)"), Arel.sql("ROUND(AVG(duration_ms))"), Arel.sql("MAX(duration_ms)"))
    .map { |cls, count, avg, max| { job_class: cls, count: count.to_i, avg_ms: avg.to_i, max_ms: max.to_i } }
end

.status_counts(minutes: 60) ⇒ Object

Success/fail/DLQ counts



80
81
82
# File 'app/models/pgbus/job_stat.rb', line 80

def self.status_counts(minutes: 60)
  since(minutes.minutes.ago).group(:status).count
end

.summary(minutes: 60) ⇒ Object

Single-query aggregate summary using conditional counts.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'app/models/pgbus/job_stat.rb', line 95

def self.summary(minutes: 60)
  cols = [
    "COUNT(*)",
    "COUNT(*) FILTER (WHERE status = 'success')",
    "COUNT(*) FILTER (WHERE status = 'failed')",
    "COUNT(*) FILTER (WHERE status = 'dead_lettered')",
    "ROUND(AVG(duration_ms)::numeric, 1)",
    "MAX(duration_ms)"
  ]
  if latency_columns?
    cols.push(
      "ROUND(AVG(enqueue_latency_ms) FILTER (WHERE enqueue_latency_ms IS NOT NULL)::numeric, 1)",
      "PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY enqueue_latency_ms) " \
      "FILTER (WHERE enqueue_latency_ms IS NOT NULL)",
      "PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY enqueue_latency_ms) " \
      "FILTER (WHERE enqueue_latency_ms IS NOT NULL)",
      "PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY enqueue_latency_ms) " \
      "FILTER (WHERE enqueue_latency_ms IS NOT NULL)",
      "ROUND(AVG(retry_count) FILTER (WHERE retry_count IS NOT NULL)::numeric, 2)"
    )
  end

  row = since(minutes.minutes.ago).pick(*cols.map { |c| Arel.sql(c) })

  result = {
    total: row[0].to_i,
    success: row[1].to_i,
    failed: row[2].to_i,
    dead_lettered: row[3].to_i,
    avg_duration_ms: row[4]&.to_f || 0,
    max_duration_ms: row[5].to_i
  }

  if latency_columns?
    result.merge!(
      avg_latency_ms: row[6]&.to_f || 0,
      p50_latency_ms: row[7]&.to_f || 0,
      p95_latency_ms: row[8]&.to_f || 0,
      p99_latency_ms: row[9]&.to_f || 0,
      avg_retries: row[10]&.to_f || 0
    )
  end

  result
end

.table_exists?Boolean

Memoized — intentionally never invalidated at runtime. If the pgbus_job_stats migration runs while the app is already running, a restart is required for stat recording to begin.

We only memoize a successful probe. A transient error (PG hiccup during boot, connection refused during a failover) is treated as “don’t know yet” — the next call retries. Caching false on the first hiccup would permanently disable job stats for the process lifetime, which is a worse failure mode than a few retries. See issue #98 / PR #91 (StreamStat fix).

Returns:

  • (Boolean)


40
41
42
43
44
45
46
47
# File 'app/models/pgbus/job_stat.rb', line 40

def self.table_exists?
  return @table_exists if defined?(@table_exists)

  @table_exists = connection.table_exists?(table_name)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus] Failed to check job stat table: #{e.message}" }
  false
end

.throughput(minutes: 60) ⇒ Object

Throughput: jobs per minute bucketed by minute for the last N minutes



64
65
66
67
68
69
# File 'app/models/pgbus/job_stat.rb', line 64

def self.throughput(minutes: 60)
  since(minutes.minutes.ago)
    .group("date_trunc('minute', created_at)")
    .order(Arel.sql("date_trunc('minute', created_at)"))
    .count
end