Class: Pcrd::Backfill::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/backfill/engine.rb

Overview

Drives the full backfill loop for all tables in the migration spec.

For each table:

1. Reads last_completed_key from the checkpoint store (nil = fresh start)
2. Loops: execute one Batch, record it in checkpoint, call on_batch
3. Stops when the batch returns no rows (end of table) or stop! is called

Thread safety: stop! can be called from any thread; the engine checks

Defined Under Namespace

Classes: Result

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_pool:, target_pool:, config:, checkpoint:, source_schema: {}) ⇒ Engine

Returns a new instance of Engine.



27
28
29
30
31
32
33
34
35
# File 'lib/pcrd/backfill/engine.rb', line 27

def initialize(source_pool:, target_pool:, config:, checkpoint:, source_schema: {})
  @source_pool   = source_pool
  @target_pool   = target_pool
  @config        = config
  @checkpoint    = checkpoint
  @source_schema = source_schema  # Hash<table_name, { columns:, pk_columns: }>
  @stop          = false
  @mutex         = Mutex.new
end

Class Method Details

.throttle_delay(row_count, duration_ms, cap) ⇒ Object

Seconds to pause after a batch to hold the average copy rate at or below ‘cap` rows/sec. Returns 0 when unthrottled or already slower than the cap.



19
20
21
22
23
24
25
# File 'lib/pcrd/backfill/engine.rb', line 19

def self.throttle_delay(row_count, duration_ms, cap)
  return 0.0 unless cap && cap.positive?

  needed  = row_count.to_f / cap
  elapsed = duration_ms / 1000.0
  [needed - elapsed, 0.0].max
end

Instance Method Details

#run(on_batch: nil) ⇒ Object

Runs backfill for all configured tables.

on_batch: optional Proc called after each batch with a stats Hash:

{ table:, batch_num:, row_count:, rows_so_far:, duration_ms:, last_key: }

Returns Array<Result>.



43
44
45
46
47
48
49
50
# File 'lib/pcrd/backfill/engine.rb', line 43

def run(on_batch: nil)
  @checkpoint.set_phase(:backfill)
  @checkpoint.set_started_at(Time.now.iso8601)

  @config.migrate.tables.map do |table_config|
    run_table(table_config, on_batch: on_batch)
  end
end

#stop!Object

Signal the engine to stop cleanly after the current batch.



53
54
55
# File 'lib/pcrd/backfill/engine.rb', line 53

def stop!
  @mutex.synchronize { @stop = true }
end

#stopped?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/pcrd/backfill/engine.rb', line 57

def stopped?
  @mutex.synchronize { @stop }
end