Class: Pcrd::Checkpoint::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/checkpoint/store.rb

Overview

SQLite-backed store for migration progress.

Tracks two things:

1. Metadata (phase, LSN watermark, start time) — key/value rows
2. Completed batches — one row per successfully copied batch,
   with start/end key, row count, duration, and timestamp.

The per-batch log is what makes resumption safe and auditable: on resume, ‘last_completed_key` returns the highest end_key and the backfill skips straight past it. It also powers throughput stats and ETA estimates.

Constant Summary collapse

LSN_FORMAT =

A PostgreSQL LSN is two hex segments joined by a slash, e.g. “16/B374D848”.

/\A[0-9A-Fa-f]+\/[0-9A-Fa-f]+\z/
SCHEMA_SQL =
<<~SQL.freeze
  CREATE TABLE IF NOT EXISTS metadata (
    key   TEXT PRIMARY KEY,
    value TEXT NOT NULL
  );

  CREATE TABLE IF NOT EXISTS batches (
    id           INTEGER PRIMARY KEY AUTOINCREMENT,
    table_name   TEXT    NOT NULL,
    start_key    TEXT    NOT NULL,
    end_key      TEXT    NOT NULL,
    row_count    INTEGER NOT NULL,
    duration_ms  INTEGER NOT NULL,
    completed_at TEXT    NOT NULL
  );

  CREATE INDEX IF NOT EXISTS idx_batches_table
    ON batches (table_name, id DESC);
SQL

Instance Method Summary collapse

Constructor Details

#initialize(path) ⇒ Store

Returns a new instance of Store.



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/pcrd/checkpoint/store.rb', line 43

def initialize(path)
  @path = path
  @db   = SQLite3::Database.new(path)
  @db.results_as_hash = true
  @db.busy_timeout = 5_000
  @db.execute_batch(SCHEMA_SQL)
  # Backfill (recording batches) and the apply worker (recording LSN) hit
  # this store from two threads at once. SQLite3::Database is a single
  # connection, so serialize every @db access through one mutex. The lock
  # is taken only at the lowest level (get_meta/set_meta and the batch
  # methods) so the public wrappers never re-enter it.
  @mutex = Mutex.new
end

Instance Method Details

#backfill_start_lsnObject



84
85
86
# File 'lib/pcrd/checkpoint/store.rb', line 84

def backfill_start_lsn
  get_meta("backfill_start_lsn")
end

#batch_stats(table:) ⇒ Object

Returns aggregate stats for a table’s completed batches.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/pcrd/checkpoint/store.rb', line 150

def batch_stats(table:)
  row = @mutex.synchronize do
    @db.get_first_row(
      "SELECT COUNT(*) AS cnt, SUM(row_count) AS total_rows, " \
      "AVG(CAST(row_count AS REAL) / NULLIF(duration_ms, 0) * 1000) AS avg_rps " \
      "FROM batches WHERE table_name = ?",
      [table.to_s]
    )
  end
  {
    batch_count:    row["cnt"].to_i,
    total_rows:     row["total_rows"].to_i,
    avg_rows_per_sec: row["avg_rps"]&.round(1) || 0.0
  }
end

#batches(table:, limit: 100) ⇒ Object

All completed batches for a table, newest first.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/pcrd/checkpoint/store.rb', line 167

def batches(table:, limit: 100)
  rows = @mutex.synchronize do
    @db.execute(
      "SELECT * FROM batches WHERE table_name = ? ORDER BY id DESC LIMIT ?",
      [table.to_s, limit]
    )
  end
  rows.map do |row|
    {
      id:           row["id"].to_i,
      table_name:   row["table_name"],
      start_key:    JSON.parse(row["start_key"]),
      end_key:      JSON.parse(row["end_key"]),
      row_count:    row["row_count"].to_i,
      duration_ms:  row["duration_ms"].to_i,
      completed_at: row["completed_at"]
    }
  end
end

#closeObject



57
58
59
# File 'lib/pcrd/checkpoint/store.rb', line 57

def close
  @mutex.synchronize { @db.close unless @db.closed? }
end

#last_completed_key(table:) ⇒ Object

Returns the end_key of the last completed batch for a table, decoded from JSON. Returns nil if no batches have been recorded for this table (fresh start).



139
140
141
142
143
144
145
146
147
# File 'lib/pcrd/checkpoint/store.rb', line 139

def last_completed_key(table:)
  row = @mutex.synchronize do
    @db.get_first_row(
      "SELECT end_key FROM batches WHERE table_name = ? ORDER BY id DESC LIMIT 1",
      [table.to_s]
    )
  end
  row ? JSON.parse(row["end_key"]) : nil
end

#lsnObject



72
73
74
# File 'lib/pcrd/checkpoint/store.rb', line 72

def lsn
  get_meta("current_lsn")
end

#phaseObject

── phase & LSN metadata ─────────────────────────────────────────────



63
64
65
66
# File 'lib/pcrd/checkpoint/store.rb', line 63

def phase
  val = get_meta("phase")
  val ? val.to_sym : :new
end

#publicationObject



102
103
104
# File 'lib/pcrd/checkpoint/store.rb', line 102

def publication
  get_meta("publication")
end

#record_batch(table:, start_key:, end_key:, row_count:, duration_ms:) ⇒ Object

Record a successfully completed batch. Keys are JSON-encoded to support multi-column primary keys.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/pcrd/checkpoint/store.rb', line 122

def record_batch(table:, start_key:, end_key:, row_count:, duration_ms:)
  @mutex.synchronize do
    @db.execute(
      "INSERT INTO batches (table_name, start_key, end_key, row_count, duration_ms, completed_at) " \
      "VALUES (?, ?, ?, ?, ?, ?)",
      [table.to_s,
       JSON.generate(start_key),
       JSON.generate(end_key),
       row_count.to_i,
       duration_ms.to_i,
       Time.now.iso8601]
    )
  end
end

#replication_slotObject

Replication objects this migration created, recorded so a resume can be cross-checked against the config and cleanup knows what to remove.



94
95
96
# File 'lib/pcrd/checkpoint/store.rb', line 94

def replication_slot
  get_meta("replication_slot")
end

#set_backfill_start_lsn(lsn) ⇒ Object



88
89
90
# File 'lib/pcrd/checkpoint/store.rb', line 88

def set_backfill_start_lsn(lsn)
  set_meta("backfill_start_lsn", lsn)
end

#set_lsn(lsn) ⇒ Object



76
77
78
79
80
81
82
# File 'lib/pcrd/checkpoint/store.rb', line 76

def set_lsn(lsn)
  unless lsn.is_a?(String) && lsn.match?(LSN_FORMAT)
    raise ArgumentError, "invalid LSN: #{lsn.inspect}"
  end

  set_meta("current_lsn", lsn)
end

#set_phase(phase) ⇒ Object



68
69
70
# File 'lib/pcrd/checkpoint/store.rb', line 68

def set_phase(phase)
  set_meta("phase", phase.to_s)
end

#set_publication(name) ⇒ Object



106
107
108
# File 'lib/pcrd/checkpoint/store.rb', line 106

def set_publication(name)
  set_meta("publication", name)
end

#set_replication_slot(name) ⇒ Object



98
99
100
# File 'lib/pcrd/checkpoint/store.rb', line 98

def set_replication_slot(name)
  set_meta("replication_slot", name)
end

#set_started_at(ts) ⇒ Object



114
115
116
# File 'lib/pcrd/checkpoint/store.rb', line 114

def set_started_at(ts)
  set_meta("started_at", ts)
end

#started_atObject



110
111
112
# File 'lib/pcrd/checkpoint/store.rb', line 110

def started_at
  get_meta("started_at")
end

#total_rows_copied(table:) ⇒ Object



187
188
189
190
191
192
193
194
195
# File 'lib/pcrd/checkpoint/store.rb', line 187

def total_rows_copied(table:)
  row = @mutex.synchronize do
    @db.get_first_row(
      "SELECT COALESCE(SUM(row_count), 0) AS total FROM batches WHERE table_name = ?",
      [table.to_s]
    )
  end
  row["total"].to_i
end