Class: Pcrd::Monitor::Lag
- Inherits:
-
Object
- Object
- Pcrd::Monitor::Lag
- Defined in:
- lib/pcrd/monitor/lag.rb
Overview
Queries the source database for replication lag on a named slot.
Lag is reported in bytes (WAL bytes the slot has not yet confirmed). A rolling window of recent readings is maintained so callers can compute rate-of-change and estimated time to zero.
Defined Under Namespace
Classes: Reading
Constant Summary collapse
- WINDOW_SIZE =
readings to keep for trend analysis
10
Instance Method Summary collapse
-
#confirmed_lsn ⇒ Object
Returns the confirmed_flush_lsn for the slot as a “X/Y” string.
-
#eta_seconds ⇒ Object
Estimated seconds until lag reaches zero at current trend.
-
#initialize(source_pool:, slot_name:) ⇒ Lag
constructor
A new instance of Lag.
-
#lag_bytes ⇒ Object
Queries the current lag in bytes.
-
#summary ⇒ Object
Human-readable lag summary string.
-
#trend_bytes_per_sec ⇒ Object
Returns bytes/second rate of lag change (negative = lag is shrinking).
Constructor Details
#initialize(source_pool:, slot_name:) ⇒ Lag
Returns a new instance of Lag.
15 16 17 18 19 |
# File 'lib/pcrd/monitor/lag.rb', line 15 def initialize(source_pool:, slot_name:) @pool = source_pool @slot_name = slot_name @history = [] end |
Instance Method Details
#confirmed_lsn ⇒ Object
Returns the confirmed_flush_lsn for the slot as a “X/Y” string.
39 40 41 42 43 44 45 46 |
# File 'lib/pcrd/monitor/lag.rb', line 39 def confirmed_lsn result = @pool.exec(<<~SQL, [@slot_name]) SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1 SQL result.ntuples > 0 ? result[0]["confirmed_flush_lsn"] : nil rescue Connection::Error nil end |
#eta_seconds ⇒ Object
Estimated seconds until lag reaches zero at current trend. Returns nil if trend is not converging (positive or unknown).
63 64 65 66 67 68 69 70 71 |
# File 'lib/pcrd/monitor/lag.rb', line 63 def eta_seconds trend = trend_bytes_per_sec return nil unless trend&.negative? current = @history.last&.bytes return nil unless current -(current / trend).ceil end |
#lag_bytes ⇒ Object
Queries the current lag in bytes. Returns nil if the slot is not found.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/pcrd/monitor/lag.rb', line 22 def lag_bytes result = @pool.exec(<<~SQL, [@slot_name]) SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::bigint AS lag FROM pg_replication_slots WHERE slot_name = $1 SQL return nil if result.ntuples.zero? bytes = result[0]["lag"].to_i record(bytes) bytes rescue Connection::Error nil end |
#summary ⇒ Object
Human-readable lag summary string.
74 75 76 77 78 79 80 81 82 |
# File 'lib/pcrd/monitor/lag.rb', line 74 def summary bytes = lag_bytes return "unknown (slot not found)" if bytes.nil? parts = ["#{format_bytes(bytes)} behind"] eta = eta_seconds parts << "ETA ~#{format_duration(eta)}" if eta parts.join(" ") end |
#trend_bytes_per_sec ⇒ Object
Returns bytes/second rate of lag change (negative = lag is shrinking). Returns nil if fewer than 2 readings available.
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/pcrd/monitor/lag.rb', line 50 def trend_bytes_per_sec return nil if @history.size < 2 oldest = @history.first newest = @history.last elapsed = newest.taken_at - oldest.taken_at return nil if elapsed <= 0 (newest.bytes - oldest.bytes) / elapsed end |