Class: Pcrd::Migration::Orchestrator
- Inherits:
-
Object
- Object
- Pcrd::Migration::Orchestrator
- Defined in:
- lib/pcrd/migration/orchestrator.rb
Overview
Drives the full migrate flow: setup, concurrent backfill + WAL apply, and the streaming monitor, with cleanup guaranteed. Extracted from the CLI so the orchestration is testable on its own and free of Thor.
The CLI stays a thin adapter: it runs preflight, confirms with the operator, installs signal traps that call #request_stop, and renders the result. All progress output goes through an injected Reporter.
#run assumes preflight has passed and the operator has confirmed; it returns an outcome symbol (:completed | :interrupted | :backfill_only) and raises Pcrd::Error subclasses (Replication::Error, Connection::Error, …) for the CLI to translate.
Constant Summary collapse
- LAG_CHECK_INTERVAL =
seconds between lag readings in streaming mode
2
Instance Method Summary collapse
-
#initialize(config:, options: {}, reporter: Reporter::Console.new) ⇒ Orchestrator
constructor
A new instance of Orchestrator.
-
#request_stop ⇒ Object
Safe to call from a signal handler / another thread.
- #run ⇒ Object
Constructor Details
#initialize(config:, options: {}, reporter: Reporter::Console.new) ⇒ Orchestrator
Returns a new instance of Orchestrator.
20 21 22 23 24 25 26 |
# File 'lib/pcrd/migration/orchestrator.rb', line 20 def initialize(config:, options: {}, reporter: Reporter::Console.new) @config = config @options = Options.normalize() @reporter = reporter @mutex = Mutex.new @stop = false end |
Instance Method Details
#request_stop ⇒ Object
Safe to call from a signal handler / another thread.
29 30 31 32 33 |
# File 'lib/pcrd/migration/orchestrator.rb', line 29 def request_stop @mutex.synchronize { @stop = true } @backfill_engine&.stop! @reporter.info("\nStopping...") end |
#run ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/pcrd/migration/orchestrator.rb', line 35 def run @source_pool = Connection::Client.new(@config.source) @target_pool = Connection::Client.new(@config.target) @checkpoint = Checkpoint::Store.new(@config.migrate.checkpoint_db) setup = Schema::Setup.new(source_pool: @source_pool, target_pool: @target_pool, config: @config) report_session_settings acquire_lock! start_lsn = prepare_replication(setup) start_streaming(start_lsn) unless backfill_only? return :interrupted if run_backfill == :interrupted if backfill_only? @reporter.info("Run `pcrd verify` to check row counts, then `pcrd cutover` when ready.") return :backfill_only end stream_until_stopped ensure cleanup end |