Module: Pgbus::TableMaintenance
- Defined in:
- lib/pgbus/table_maintenance.rb
Overview
Proactive table maintenance to reduce bloat on PGMQ queue tables.
PGMQ’s read operation UPDATEs three columns (vt, read_ct, last_read_at) on every message read. With the default fillfactor of 100, every UPDATE creates a new heap tuple AND a new index entry — the dead tuple and its old index pointer remain until VACUUM. Under sustained load, autovacuum can’t keep up and B-tree indexes bloat.
Setting fillfactor=70 on queue tables reserves 30% of each page for update churn. Because ‘vt` is indexed and changes on every read, these writes are not HOT updates, but leaving headroom on heap pages still reduces page density for a table that is updated heavily between vacuum passes.
More importantly, this module provides targeted VACUUM: instead of relying solely on autovacuum’s global heuristics, the dispatcher periodically checks pg_stat_user_tables for tables with high dead tuple ratios and vacuums them explicitly. This is inspired by pgque’s philosophy of measuring bloat before acting.
Constant Summary collapse
- FILLFACTOR =
70- BLOAT_THRESHOLD =
0.1- MAINTENANCE_INTERVAL =
6 hours
6 * 3600
Class Method Summary collapse
- .fillfactor_sql_for_all_queues ⇒ Object
- .fillfactor_sql_for_queue(queue_name) ⇒ Object
- .reindex_sql(table) ⇒ Object
- .run_maintenance(conn, threshold: BLOAT_THRESHOLD, reindex: true) ⇒ Object
- .vacuum_candidates(conn, threshold: BLOAT_THRESHOLD) ⇒ Object
- .vacuum_sql(table) ⇒ Object
Class Method Details
.fillfactor_sql_for_all_queues ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/pgbus/table_maintenance.rb', line 33 def fillfactor_sql_for_all_queues <<~SQL DO $$ DECLARE q RECORD; BEGIN FOR q IN SELECT queue_name FROM pgmq.meta LOOP EXECUTE format('ALTER TABLE pgmq.q_%I SET (fillfactor = #{FILLFACTOR})', q.queue_name); END LOOP; END $$; SQL end |
.fillfactor_sql_for_queue(queue_name) ⇒ Object
29 30 31 |
# File 'lib/pgbus/table_maintenance.rb', line 29 def fillfactor_sql_for_queue(queue_name) "ALTER TABLE pgmq.q_#{queue_name} SET (fillfactor = #{FILLFACTOR});" end |
.reindex_sql(table) ⇒ Object
78 79 80 81 |
# File 'lib/pgbus/table_maintenance.rb', line 78 def reindex_sql(table) schema, relname = table.split(".", 2) "REINDEX TABLE CONCURRENTLY \"#{schema}\".\"#{relname}\"" end |
.run_maintenance(conn, threshold: BLOAT_THRESHOLD, reindex: true) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/pgbus/table_maintenance.rb', line 83 def run_maintenance(conn, threshold: BLOAT_THRESHOLD, reindex: true) candidates = vacuum_candidates(conn, threshold: threshold) return 0 if candidates.empty? maintained = 0 candidates.each do |candidate| table = candidate[:table] Pgbus.logger.info do "[Pgbus::TableMaintenance] Vacuuming #{table} " \ "(dead_ratio=#{candidate[:dead_ratio]}, dead=#{candidate[:dead_tuples]})" end conn.exec(vacuum_sql(table)) if reindex Pgbus.logger.info { "[Pgbus::TableMaintenance] Reindexing #{table}" } conn.exec(reindex_sql(table)) end maintained += 1 rescue StandardError => e Pgbus.logger.error { "[Pgbus::TableMaintenance] Failed to maintain #{table}: #{e.}" } end maintained end |
.vacuum_candidates(conn, threshold: BLOAT_THRESHOLD) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pgbus/table_maintenance.rb', line 46 def vacuum_candidates(conn, threshold: BLOAT_THRESHOLD) rows = conn.exec(<<~SQL) SELECT schemaname, relname, n_dead_tup, n_live_tup FROM pg_stat_user_tables WHERE schemaname = 'pgmq' AND relname LIKE 'q_%' ORDER BY n_dead_tup DESC SQL rows.each_with_object([]) do |row, candidates| dead = row["n_dead_tup"].to_i live = row["n_live_tup"].to_i total = dead + live next if total.zero? ratio = dead.to_f / total next unless ratio > threshold candidates << { table: "#{row["schemaname"]}.#{row["relname"]}", dead_tuples: dead, live_tuples: live, dead_ratio: ratio.round(4) } end end |
.vacuum_sql(table) ⇒ Object
73 74 75 76 |
# File 'lib/pgbus/table_maintenance.rb', line 73 def vacuum_sql(table) schema, relname = table.split(".", 2) "VACUUM \"#{schema}\".\"#{relname}\"" end |