Class: Pcrd::Backfill::Engine
- Inherits:
-
Object
- Object
- Pcrd::Backfill::Engine
- Defined in:
- lib/pcrd/backfill/engine.rb
Overview
Drives the full backfill loop for all tables in the migration spec.
For each table:
1. Reads last_completed_key from the checkpoint store (nil = fresh start)
2. Loops: execute one Batch, record it in checkpoint, call on_batch
3. Stops when the batch returns no rows (end of table) or stop! is called
Thread safety: stop! can be called from any thread; the engine checks
Defined Under Namespace
Classes: Result
Class Method Summary collapse
-
.throttle_delay(row_count, duration_ms, cap) ⇒ Object
Seconds to pause after a batch to hold the average copy rate at or below ‘cap` rows/sec.
Instance Method Summary collapse
-
#initialize(source_pool:, target_pool:, config:, checkpoint:, source_schema: {}) ⇒ Engine
constructor
A new instance of Engine.
-
#run(on_batch: nil) ⇒ Object
Runs backfill for all configured tables.
-
#stop! ⇒ Object
Signal the engine to stop cleanly after the current batch.
- #stopped? ⇒ Boolean
Constructor Details
#initialize(source_pool:, target_pool:, config:, checkpoint:, source_schema: {}) ⇒ Engine
Returns a new instance of Engine.
27 28 29 30 31 32 33 34 35 |
# File 'lib/pcrd/backfill/engine.rb', line 27 def initialize(source_pool:, target_pool:, config:, checkpoint:, source_schema: {}) @source_pool = source_pool @target_pool = target_pool @config = config @checkpoint = checkpoint @source_schema = source_schema # Hash<table_name, { columns:, pk_columns: }> @stop = false @mutex = Mutex.new end |
Class Method Details
.throttle_delay(row_count, duration_ms, cap) ⇒ Object
Seconds to pause after a batch to hold the average copy rate at or below ‘cap` rows/sec. Returns 0 when unthrottled or already slower than the cap.
19 20 21 22 23 24 25 |
# File 'lib/pcrd/backfill/engine.rb', line 19 def self.throttle_delay(row_count, duration_ms, cap) return 0.0 unless cap && cap.positive? needed = row_count.to_f / cap elapsed = duration_ms / 1000.0 [needed - elapsed, 0.0].max end |
Instance Method Details
#run(on_batch: nil) ⇒ Object
Runs backfill for all configured tables.
on_batch: optional Proc called after each batch with a stats Hash:
{ table:, batch_num:, row_count:, rows_so_far:, duration_ms:, last_key: }
Returns Array<Result>.
43 44 45 46 47 48 49 50 |
# File 'lib/pcrd/backfill/engine.rb', line 43 def run(on_batch: nil) @checkpoint.set_phase(:backfill) @checkpoint.set_started_at(Time.now.iso8601) @config.migrate.tables.map do |table_config| run_table(table_config, on_batch: on_batch) end end |
#stop! ⇒ Object
Signal the engine to stop cleanly after the current batch.
53 54 55 |
# File 'lib/pcrd/backfill/engine.rb', line 53 def stop! @mutex.synchronize { @stop = true } end |
#stopped? ⇒ Boolean
57 58 59 |
# File 'lib/pcrd/backfill/engine.rb', line 57 def stopped? @mutex.synchronize { @stop } end |