Class: Pcrd::Monitor::Lag

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/monitor/lag.rb

Overview

Queries the source database for replication lag on a named slot.

Lag is reported in bytes (WAL bytes the slot has not yet confirmed). A rolling window of recent readings is maintained so callers can compute rate-of-change and estimated time to zero.

Defined Under Namespace

Classes: Reading

Constant Summary collapse

WINDOW_SIZE =

readings to keep for trend analysis

10

Instance Method Summary collapse

Constructor Details

#initialize(source_pool:, slot_name:) ⇒ Lag

Returns a new instance of Lag.



15
16
17
18
19
# File 'lib/pcrd/monitor/lag.rb', line 15

def initialize(source_pool:, slot_name:)
  @pool      = source_pool
  @slot_name = slot_name
  @history   = []
end

Instance Method Details

#confirmed_lsnObject

Returns the confirmed_flush_lsn for the slot as a “X/Y” string.



39
40
41
42
43
44
45
46
# File 'lib/pcrd/monitor/lag.rb', line 39

def confirmed_lsn
  result = @pool.exec(<<~SQL, [@slot_name])
    SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1
  SQL
  result.ntuples > 0 ? result[0]["confirmed_flush_lsn"] : nil
rescue Connection::Error
  nil
end

#eta_secondsObject

Estimated seconds until lag reaches zero at current trend. Returns nil if trend is not converging (positive or unknown).



63
64
65
66
67
68
69
70
71
# File 'lib/pcrd/monitor/lag.rb', line 63

def eta_seconds
  trend = trend_bytes_per_sec
  return nil unless trend&.negative?

  current = @history.last&.bytes
  return nil unless current

  -(current / trend).ceil
end

#lag_bytesObject

Queries the current lag in bytes. Returns nil if the slot is not found.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/pcrd/monitor/lag.rb', line 22

def lag_bytes
  result = @pool.exec(<<~SQL, [@slot_name])
    SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::bigint AS lag
    FROM   pg_replication_slots
    WHERE  slot_name = $1
  SQL

  return nil if result.ntuples.zero?

  bytes = result[0]["lag"].to_i
  record(bytes)
  bytes
rescue Connection::Error
  nil
end

#summaryObject

Human-readable lag summary string.



74
75
76
77
78
79
80
81
82
# File 'lib/pcrd/monitor/lag.rb', line 74

def summary
  bytes = lag_bytes
  return "unknown (slot not found)" if bytes.nil?

  parts = ["#{format_bytes(bytes)} behind"]
  eta   = eta_seconds
  parts << "ETA ~#{format_duration(eta)}" if eta
  parts.join("  ")
end

#trend_bytes_per_secObject

Returns bytes/second rate of lag change (negative = lag is shrinking). Returns nil if fewer than 2 readings available.



50
51
52
53
54
55
56
57
58
59
# File 'lib/pcrd/monitor/lag.rb', line 50

def trend_bytes_per_sec
  return nil if @history.size < 2

  oldest = @history.first
  newest = @history.last
  elapsed = newest.taken_at - oldest.taken_at
  return nil if elapsed <= 0

  (newest.bytes - oldest.bytes) / elapsed
end