Class: Pcrd::Schema::Setup
- Inherits:
-
Object
- Object
- Pcrd::Schema::Setup
- Defined in:
- lib/pcrd/schema/setup.rb
Overview
Creates target tables from the migration spec. Called at the start of ‘pcrd migrate` (after preflight passes).
In the full streaming flow (Phase 9+), Setup also creates the publication and replication slot on source. For –backfill-only those are skipped.
Instance Method Summary collapse
-
#create_publication_and_slot(pub_name:, slot_name:) ⇒ Object
Creates the publication and replication slot on the source.
-
#create_target_tables(force_overwrite: false) ⇒ Object
Creates all target tables and returns a Hash<table_name, ddl_string>.
-
#drop_publication_and_slot(pub_name:, slot_name:) ⇒ Object
Drops the publication and replication slot (cleanup phase).
-
#initialize(source_pool:, target_pool:, config:) ⇒ Setup
constructor
A new instance of Setup.
-
#validate_resumable!(pub_name:, slot_name:) ⇒ Object
Validates that a –resume run has the slot and publication it needs.
Constructor Details
#initialize(source_pool:, target_pool:, config:) ⇒ Setup
Returns a new instance of Setup.
11 12 13 14 15 |
# File 'lib/pcrd/schema/setup.rb', line 11 def initialize(source_pool:, target_pool:, config:) @source_pool = source_pool @target_pool = target_pool @config = config end |
Instance Method Details
#create_publication_and_slot(pub_name:, slot_name:) ⇒ Object
Creates the publication and replication slot on the source. Returns the slot’s starting LSN as a “X/Y” string — pass this to the consumer so streaming begins from a point that covers all of backfill. Idempotently ensures the publication and replication slot exist for a fresh migration, returning the slot’s starting LSN (“X/Y”).
A leftover publication from a partial prior run is reused if it covers exactly the configured tables (it is just a definition). A leftover slot is NOT reused: its WAL position is unknown relative to backfill, so we refuse and point the operator at –resume or ‘pcrd cleanup`.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/pcrd/schema/setup.rb', line 27 def create_publication_and_slot(pub_name:, slot_name:) ensure_publication(pub_name) if slot_exists?(slot_name) raise SetupError, "Replication slot '#{slot_name}' already exists. Resume the existing " \ "migration with --resume, or remove it with `pcrd cleanup` to start over." end result = @source_pool.exec( "SELECT lsn FROM pg_create_logical_replication_slot($1, 'pgoutput')", [slot_name] ) result[0]["lsn"] end |
#create_target_tables(force_overwrite: false) ⇒ Object
Creates all target tables and returns a Hash<table_name, ddl_string>. Raises if a target table already exists (use –force-overwrite to drop first).
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/pcrd/schema/setup.rb', line 70 def create_target_tables(force_overwrite: false) reader = Reader.new(@source_pool) ddls = {} @config.migrate.tables.each do |table_config| name = table_config.name source_cols = reader.read(name) pk_cols = reader.primary_key_columns(name) ddl = DDL.generate( source_columns: source_cols, table_config: table_config, primary_key_columns: pk_cols ) target_reader = Reader.new(@target_pool) if target_reader.table_exists?(name) if force_overwrite @target_pool.exec_sql("DROP TABLE IF EXISTS #{Sql.quote_table(name)} CASCADE") else raise SetupError, "Table '#{name}' already exists on target. " \ "Pass --force-overwrite to drop and recreate." end end @target_pool.exec_sql("#{ddl};") ddls[name] = ddl end ddls end |
#drop_publication_and_slot(pub_name:, slot_name:) ⇒ Object
Drops the publication and replication slot (cleanup phase).
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/pcrd/schema/setup.rb', line 57 def drop_publication_and_slot(pub_name:, slot_name:) @source_pool.exec_sql( "DROP PUBLICATION IF EXISTS #{@source_pool.quote_ident(pub_name)}" ) @source_pool.exec( "SELECT pg_drop_replication_slot($1) WHERE EXISTS (" \ " SELECT 1 FROM pg_replication_slots WHERE slot_name = $1)", [slot_name] ) end |
#validate_resumable!(pub_name:, slot_name:) ⇒ Object
Validates that a –resume run has the slot and publication it needs. Raises with a clear message if either is missing.
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pcrd/schema/setup.rb', line 44 def validate_resumable!(pub_name:, slot_name:) unless slot_exists?(slot_name) raise SetupError, "Cannot resume: replication slot '#{slot_name}' does not exist on the source. " \ "Start a fresh migration (without --resume)." end unless publication_exists?(pub_name) raise SetupError, "Cannot resume: publication '#{pub_name}' does not exist on the source. " \ "Start a fresh migration (without --resume)." end end |