Class: Pcrd::Migration::Orchestrator

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/migration/orchestrator.rb

Overview

Drives the full migrate flow: setup, concurrent backfill + WAL apply, and the streaming monitor, with cleanup guaranteed. Extracted from the CLI so the orchestration is testable on its own and free of Thor.

The CLI stays a thin adapter: it runs preflight, confirms with the operator, installs signal traps that call #request_stop, and renders the result. All progress output goes through an injected Reporter.

#run assumes preflight has passed and the operator has confirmed; it returns an outcome symbol (:completed | :interrupted | :backfill_only) and raises Pcrd::Error subclasses (Replication::Error, Connection::Error, …) for the CLI to translate.

Constant Summary collapse

LAG_CHECK_INTERVAL =

seconds between lag readings in streaming mode

2

Instance Method Summary collapse

Constructor Details

#initialize(config:, options: {}, reporter: Reporter::Console.new) ⇒ Orchestrator

Returns a new instance of Orchestrator.



20
21
22
23
24
25
26
# File 'lib/pcrd/migration/orchestrator.rb', line 20

def initialize(config:, options: {}, reporter: Reporter::Console.new)
  @config   = config
  @options  = Options.normalize(options)
  @reporter = reporter
  @mutex    = Mutex.new
  @stop     = false
end

Instance Method Details

#request_stopObject

Safe to call from a signal handler / another thread.



29
30
31
32
33
# File 'lib/pcrd/migration/orchestrator.rb', line 29

def request_stop
  @mutex.synchronize { @stop = true }
  @backfill_engine&.stop!
  @reporter.info("\nStopping...")
end

#runObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/pcrd/migration/orchestrator.rb', line 35

def run
  @source_pool = Connection::Client.new(@config.source)
  @target_pool = Connection::Client.new(@config.target)
  @checkpoint  = Checkpoint::Store.new(@config.migrate.checkpoint_db)
  setup        = Schema::Setup.new(source_pool: @source_pool, target_pool: @target_pool, config: @config)

  report_session_settings
  acquire_lock!

  start_lsn = prepare_replication(setup)
  start_streaming(start_lsn) unless backfill_only?

  return :interrupted if run_backfill == :interrupted

  if backfill_only?
    @reporter.info("Run `pcrd verify` to check row counts, then `pcrd cutover` when ready.")
    return :backfill_only
  end

  stream_until_stopped
ensure
  cleanup
end