Class: Pcrd::Checkpoint::Store
- Inherits:
-
Object
- Object
- Pcrd::Checkpoint::Store
- Defined in:
- lib/pcrd/checkpoint/store.rb
Overview
SQLite-backed store for migration progress.
Tracks two things:
1. Metadata (phase, LSN watermark, start time) — key/value rows
2. Completed batches — one row per successfully copied batch,
with start/end key, row count, duration, and timestamp.
The per-batch log is what makes resumption safe and auditable: on resume, ‘last_completed_key` returns the highest end_key and the backfill skips straight past it. It also powers throughput stats and ETA estimates.
Constant Summary collapse
- LSN_FORMAT =
A PostgreSQL LSN is two hex segments joined by a slash, e.g. “16/B374D848”.
/\A[0-9A-Fa-f]+\/[0-9A-Fa-f]+\z/- SCHEMA_SQL =
<<~SQL.freeze CREATE TABLE IF NOT EXISTS metadata ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS batches ( id INTEGER PRIMARY KEY AUTOINCREMENT, table_name TEXT NOT NULL, start_key TEXT NOT NULL, end_key TEXT NOT NULL, row_count INTEGER NOT NULL, duration_ms INTEGER NOT NULL, completed_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_batches_table ON batches (table_name, id DESC); SQL
Instance Method Summary collapse
- #backfill_start_lsn ⇒ Object
-
#batch_stats(table:) ⇒ Object
Returns aggregate stats for a table’s completed batches.
-
#batches(table:, limit: 100) ⇒ Object
All completed batches for a table, newest first.
- #close ⇒ Object
-
#initialize(path) ⇒ Store
constructor
A new instance of Store.
-
#last_completed_key(table:) ⇒ Object
Returns the end_key of the last completed batch for a table, decoded from JSON.
- #lsn ⇒ Object
-
#phase ⇒ Object
── phase & LSN metadata ─────────────────────────────────────────────.
- #publication ⇒ Object
-
#record_batch(table:, start_key:, end_key:, row_count:, duration_ms:) ⇒ Object
Record a successfully completed batch.
-
#replication_slot ⇒ Object
Replication objects this migration created, recorded so a resume can be cross-checked against the config and cleanup knows what to remove.
- #set_backfill_start_lsn(lsn) ⇒ Object
- #set_lsn(lsn) ⇒ Object
- #set_phase(phase) ⇒ Object
- #set_publication(name) ⇒ Object
- #set_replication_slot(name) ⇒ Object
- #set_started_at(ts) ⇒ Object
- #started_at ⇒ Object
- #total_rows_copied(table:) ⇒ Object
Constructor Details
#initialize(path) ⇒ Store
Returns a new instance of Store.
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/pcrd/checkpoint/store.rb', line 43 def initialize(path) @path = path @db = SQLite3::Database.new(path) @db.results_as_hash = true @db.busy_timeout = 5_000 @db.execute_batch(SCHEMA_SQL) # Backfill (recording batches) and the apply worker (recording LSN) hit # this store from two threads at once. SQLite3::Database is a single # connection, so serialize every @db access through one mutex. The lock # is taken only at the lowest level (get_meta/set_meta and the batch # methods) so the public wrappers never re-enter it. @mutex = Mutex.new end |
Instance Method Details
#backfill_start_lsn ⇒ Object
84 85 86 |
# File 'lib/pcrd/checkpoint/store.rb', line 84 def backfill_start_lsn ("backfill_start_lsn") end |
#batch_stats(table:) ⇒ Object
Returns aggregate stats for a table’s completed batches.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/pcrd/checkpoint/store.rb', line 150 def batch_stats(table:) row = @mutex.synchronize do @db.get_first_row( "SELECT COUNT(*) AS cnt, SUM(row_count) AS total_rows, " \ "AVG(CAST(row_count AS REAL) / NULLIF(duration_ms, 0) * 1000) AS avg_rps " \ "FROM batches WHERE table_name = ?", [table.to_s] ) end { batch_count: row["cnt"].to_i, total_rows: row["total_rows"].to_i, avg_rows_per_sec: row["avg_rps"]&.round(1) || 0.0 } end |
#batches(table:, limit: 100) ⇒ Object
All completed batches for a table, newest first.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/pcrd/checkpoint/store.rb', line 167 def batches(table:, limit: 100) rows = @mutex.synchronize do @db.execute( "SELECT * FROM batches WHERE table_name = ? ORDER BY id DESC LIMIT ?", [table.to_s, limit] ) end rows.map do |row| { id: row["id"].to_i, table_name: row["table_name"], start_key: JSON.parse(row["start_key"]), end_key: JSON.parse(row["end_key"]), row_count: row["row_count"].to_i, duration_ms: row["duration_ms"].to_i, completed_at: row["completed_at"] } end end |
#close ⇒ Object
57 58 59 |
# File 'lib/pcrd/checkpoint/store.rb', line 57 def close @mutex.synchronize { @db.close unless @db.closed? } end |
#last_completed_key(table:) ⇒ Object
Returns the end_key of the last completed batch for a table, decoded from JSON. Returns nil if no batches have been recorded for this table (fresh start).
139 140 141 142 143 144 145 146 147 |
# File 'lib/pcrd/checkpoint/store.rb', line 139 def last_completed_key(table:) row = @mutex.synchronize do @db.get_first_row( "SELECT end_key FROM batches WHERE table_name = ? ORDER BY id DESC LIMIT 1", [table.to_s] ) end row ? JSON.parse(row["end_key"]) : nil end |
#lsn ⇒ Object
72 73 74 |
# File 'lib/pcrd/checkpoint/store.rb', line 72 def lsn ("current_lsn") end |
#phase ⇒ Object
── phase & LSN metadata ─────────────────────────────────────────────
63 64 65 66 |
# File 'lib/pcrd/checkpoint/store.rb', line 63 def phase val = ("phase") val ? val.to_sym : :new end |
#publication ⇒ Object
102 103 104 |
# File 'lib/pcrd/checkpoint/store.rb', line 102 def publication ("publication") end |
#record_batch(table:, start_key:, end_key:, row_count:, duration_ms:) ⇒ Object
Record a successfully completed batch. Keys are JSON-encoded to support multi-column primary keys.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/pcrd/checkpoint/store.rb', line 122 def record_batch(table:, start_key:, end_key:, row_count:, duration_ms:) @mutex.synchronize do @db.execute( "INSERT INTO batches (table_name, start_key, end_key, row_count, duration_ms, completed_at) " \ "VALUES (?, ?, ?, ?, ?, ?)", [table.to_s, JSON.generate(start_key), JSON.generate(end_key), row_count.to_i, duration_ms.to_i, Time.now.iso8601] ) end end |
#replication_slot ⇒ Object
Replication objects this migration created, recorded so a resume can be cross-checked against the config and cleanup knows what to remove.
94 95 96 |
# File 'lib/pcrd/checkpoint/store.rb', line 94 def replication_slot ("replication_slot") end |
#set_backfill_start_lsn(lsn) ⇒ Object
88 89 90 |
# File 'lib/pcrd/checkpoint/store.rb', line 88 def set_backfill_start_lsn(lsn) ("backfill_start_lsn", lsn) end |
#set_lsn(lsn) ⇒ Object
76 77 78 79 80 81 82 |
# File 'lib/pcrd/checkpoint/store.rb', line 76 def set_lsn(lsn) unless lsn.is_a?(String) && lsn.match?(LSN_FORMAT) raise ArgumentError, "invalid LSN: #{lsn.inspect}" end ("current_lsn", lsn) end |
#set_phase(phase) ⇒ Object
68 69 70 |
# File 'lib/pcrd/checkpoint/store.rb', line 68 def set_phase(phase) ("phase", phase.to_s) end |
#set_publication(name) ⇒ Object
106 107 108 |
# File 'lib/pcrd/checkpoint/store.rb', line 106 def set_publication(name) ("publication", name) end |
#set_replication_slot(name) ⇒ Object
98 99 100 |
# File 'lib/pcrd/checkpoint/store.rb', line 98 def set_replication_slot(name) ("replication_slot", name) end |
#set_started_at(ts) ⇒ Object
114 115 116 |
# File 'lib/pcrd/checkpoint/store.rb', line 114 def set_started_at(ts) ("started_at", ts) end |
#started_at ⇒ Object
110 111 112 |
# File 'lib/pcrd/checkpoint/store.rb', line 110 def started_at ("started_at") end |
#total_rows_copied(table:) ⇒ Object
187 188 189 190 191 192 193 194 195 |
# File 'lib/pcrd/checkpoint/store.rb', line 187 def total_rows_copied(table:) row = @mutex.synchronize do @db.get_first_row( "SELECT COALESCE(SUM(row_count), 0) AS total FROM batches WHERE table_name = ?", [table.to_s] ) end row["total"].to_i end |