Module: Pgbus::TableMaintenance
- Defined in:
- lib/pgbus/table_maintenance.rb
Overview
Proactive table maintenance to reduce bloat on PGMQ queue tables.
PGMQ's read operation UPDATEs three columns (vt, read_ct, last_read_at) on every message read. With the default fillfactor of 100, every UPDATE creates a new heap tuple AND a new index entry — the dead tuple and its old index pointer remain until VACUUM. Under sustained load, autovacuum can't keep up and B-tree indexes bloat.
Setting fillfactor=70 on queue tables reserves 30% of each page for
update churn. Because vt is indexed and changes on every read, these
writes are not HOT updates, but leaving headroom on heap pages still
reduces page density for a table that is updated heavily between vacuum
passes.
More importantly, this module provides targeted VACUUM: instead of relying solely on autovacuum's global heuristics, the dispatcher periodically checks pg_stat_user_tables for tables with high dead tuple ratios and vacuums them explicitly. This is inspired by pgque's philosophy of measuring bloat before acting.
Constant Summary collapse
- FILLFACTOR =
70- BLOAT_THRESHOLD =
0.1- MAINTENANCE_INTERVAL =
6 hours
6 * 3600
Class Method Summary collapse
- .fillfactor_sql_for_all_queues ⇒ Object
- .fillfactor_sql_for_queue(queue_name) ⇒ Object
- .reindex_sql(table) ⇒ Object
- .run_maintenance(conn, threshold: BLOAT_THRESHOLD, reindex: true) ⇒ Object
- .vacuum_candidates(conn, threshold: BLOAT_THRESHOLD) ⇒ Object
- .vacuum_sql(table) ⇒ Object
Class Method Details
.fillfactor_sql_for_all_queues ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/pgbus/table_maintenance.rb', line 33 def fillfactor_sql_for_all_queues <<~SQL DO $$ DECLARE q RECORD; BEGIN FOR q IN SELECT queue_name FROM pgmq.meta LOOP EXECUTE format('ALTER TABLE pgmq.q_%I SET (fillfactor = #{FILLFACTOR})', q.queue_name); END LOOP; END $$; SQL end |
.fillfactor_sql_for_queue(queue_name) ⇒ Object
29 30 31 |
# File 'lib/pgbus/table_maintenance.rb', line 29 def fillfactor_sql_for_queue(queue_name) "ALTER TABLE pgmq.q_#{queue_name} SET (fillfactor = #{FILLFACTOR});" end |
.reindex_sql(table) ⇒ Object
78 79 80 81 |
# File 'lib/pgbus/table_maintenance.rb', line 78 def reindex_sql(table) schema, relname = table.split(".", 2) "REINDEX TABLE CONCURRENTLY \"#{schema}\".\"#{relname}\"" end |
.run_maintenance(conn, threshold: BLOAT_THRESHOLD, reindex: true) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/pgbus/table_maintenance.rb', line 83 def run_maintenance(conn, threshold: BLOAT_THRESHOLD, reindex: true) candidates = vacuum_candidates(conn, threshold: threshold) return 0 if candidates.empty? maintained = 0 candidates.each do |candidate| table = candidate[:table] Pgbus.logger.info do "[Pgbus::TableMaintenance] Vacuuming #{table} " \ "(dead_ratio=#{candidate[:dead_ratio]}, dead=#{candidate[:dead_tuples]})" end conn.exec(vacuum_sql(table)) if reindex Pgbus.logger.info { "[Pgbus::TableMaintenance] Reindexing #{table}" } conn.exec(reindex_sql(table)) end maintained += 1 rescue StandardError => e Pgbus.logger.error { "[Pgbus::TableMaintenance] Failed to maintain #{table}: #{e.}" } end maintained end |
.vacuum_candidates(conn, threshold: BLOAT_THRESHOLD) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pgbus/table_maintenance.rb', line 46 def vacuum_candidates(conn, threshold: BLOAT_THRESHOLD) rows = conn.exec(<<~SQL) SELECT schemaname, relname, n_dead_tup, n_live_tup FROM pg_stat_user_tables WHERE schemaname = 'pgmq' AND relname LIKE 'q_%' ORDER BY n_dead_tup DESC SQL rows.each_with_object([]) do |row, candidates| dead = row["n_dead_tup"].to_i live = row["n_live_tup"].to_i total = dead + live next if total.zero? ratio = dead.to_f / total next unless ratio > threshold candidates << { table: "#{row["schemaname"]}.#{row["relname"]}", dead_tuples: dead, live_tuples: live, dead_ratio: ratio.round(4) } end end |
.vacuum_sql(table) ⇒ Object
73 74 75 76 |
# File 'lib/pgbus/table_maintenance.rb', line 73 def vacuum_sql(table) schema, relname = table.split(".", 2) "VACUUM \"#{schema}\".\"#{relname}\"" end |