Class: Pgbus::StreamStat

Inherits:
BusRecord
  • Object
show all
Defined in:
app/models/pgbus/stream_stat.rb

Overview

Records one row per stream event (broadcast / connect / disconnect) when ‘config.streams_stats_enabled` is true. Mirrors JobStat in shape and aggregation API so the Insights controller can query both with the same patterns.

Writes are fire-and-forget: any error is swallowed so a stat recording failure cannot affect the dispatcher’s hot path.

Constant Summary collapse

EVENT_TYPES =
%w[broadcast connect disconnect].freeze

Class Method Summary collapse

Class Method Details

.cleanup!(older_than:) ⇒ Object

Cleanup old stats. Called from Pgbus::Process::Dispatcher on the same cadence as JobStat.cleanup! using the shared stats_retention.



119
120
121
# File 'app/models/pgbus/stream_stat.rb', line 119

def self.cleanup!(older_than:)
  where("created_at < ?", older_than).delete_all
end

.record!(stream_name:, event_type:, duration_ms: 0, fanout: nil) ⇒ Object

Records a stream event. Called from the Dispatcher when the ‘streams_stats_enabled` flag is set. Errors are swallowed so a missing table or a connection blip cannot kill the dispatcher.



29
30
31
32
33
34
35
36
37
38
39
40
# File 'app/models/pgbus/stream_stat.rb', line 29

def self.record!(stream_name:, event_type:, duration_ms: 0, fanout: nil)
  return unless table_exists?

  create!(
    stream_name: stream_name,
    event_type: event_type,
    duration_ms: duration_ms.to_i,
    fanout: fanout
  )
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus] Failed to record stream stat: #{e.message}" }
end

.summary(minutes: 60) ⇒ Object

Single-query aggregate summary over the given window. Returns totals by event type, average fanout for broadcasts, and an “active” estimate (connects − disconnects in window).



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'app/models/pgbus/stream_stat.rb', line 64

def self.summary(minutes: 60)
  row = since(minutes.minutes.ago).pick(
    Arel.sql("COUNT(*) FILTER (WHERE event_type = 'broadcast')"),
    Arel.sql("COUNT(*) FILTER (WHERE event_type = 'connect')"),
    Arel.sql("COUNT(*) FILTER (WHERE event_type = 'disconnect')"),
    Arel.sql("ROUND(AVG(fanout) FILTER (WHERE event_type = 'broadcast')::numeric, 1)"),
    Arel.sql("ROUND(AVG(duration_ms) FILTER (WHERE event_type = 'broadcast')::numeric, 1)"),
    Arel.sql("ROUND(AVG(duration_ms) FILTER (WHERE event_type = 'connect')::numeric, 1)")
  )

  {
    broadcasts: row[0].to_i,
    connects: row[1].to_i,
    disconnects: row[2].to_i,
    active_estimate: [row[1].to_i - row[2].to_i, 0].max,
    avg_fanout: row[3]&.to_f || 0,
    avg_broadcast_ms: row[4]&.to_f || 0,
    avg_connect_ms: row[5]&.to_f || 0
  }
end

.table_exists?Boolean

Memoized — intentionally never invalidated at runtime. If the pgbus_stream_stats migration runs while the app is already running, a restart is required for stat recording to begin.

We only memoize a successful probe. A transient error (PG hiccup during boot, connection refused during a failover) is treated as “don’t know yet” — the next call retries. Caching false on the first hiccup would permanently disable stream stats for the process lifetime, which is a worse failure mode than a few retries.

Returns:

  • (Boolean)


52
53
54
55
56
57
58
59
# File 'app/models/pgbus/stream_stat.rb', line 52

def self.table_exists?
  return @table_exists if defined?(@table_exists)

  @table_exists = connection.table_exists?(table_name)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus] Failed to check stream stat table: #{e.message}" }
  false
end

.throughput(minutes: 60) ⇒ Object

Throughput: broadcast events per minute bucketed by minute.



109
110
111
112
113
114
115
# File 'app/models/pgbus/stream_stat.rb', line 109

def self.throughput(minutes: 60)
  broadcast_events
    .since(minutes.minutes.ago)
    .group("date_trunc('minute', created_at)")
    .order(Arel.sql("date_trunc('minute', created_at)"))
    .count
end

.top_streams(limit: 10, minutes: 60) ⇒ Object

Top N streams by broadcast count in the window, with avg fanout.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'app/models/pgbus/stream_stat.rb', line 86

def self.top_streams(limit: 10, minutes: 60)
  broadcast_events
    .since(minutes.minutes.ago)
    .group(:stream_name)
    .order(Arel.sql("COUNT(*) DESC"))
    .limit(limit)
    .pluck(
      :stream_name,
      Arel.sql("COUNT(*)"),
      Arel.sql("ROUND(AVG(fanout)::numeric, 1)"),
      Arel.sql("ROUND(AVG(duration_ms)::numeric, 1)")
    )
    .map do |name, count, avg_fanout, avg_ms|
      {
        stream_name: name,
        count: count.to_i,
        avg_fanout: avg_fanout&.to_f || 0,
        avg_ms: avg_ms&.to_f || 0
      }
    end
end