Class: Pcrd::Schema::Setup

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/schema/setup.rb

Overview

Creates target tables from the migration spec. Called at the start of ‘pcrd migrate` (after preflight passes).

In the full streaming flow (Phase 9+), Setup also creates the publication and replication slot on source. For –backfill-only those are skipped.

Instance Method Summary collapse

Constructor Details

#initialize(source_pool:, target_pool:, config:) ⇒ Setup

Returns a new instance of Setup.



11
12
13
14
15
# File 'lib/pcrd/schema/setup.rb', line 11

def initialize(source_pool:, target_pool:, config:)
  @source_pool = source_pool
  @target_pool = target_pool
  @config      = config
end

Instance Method Details

#create_publication_and_slot(pub_name:, slot_name:) ⇒ Object

Creates the publication and replication slot on the source. Returns the slot’s starting LSN as a “X/Y” string — pass this to the consumer so streaming begins from a point that covers all of backfill. Idempotently ensures the publication and replication slot exist for a fresh migration, returning the slot’s starting LSN (“X/Y”).

A leftover publication from a partial prior run is reused if it covers exactly the configured tables (it is just a definition). A leftover slot is NOT reused: its WAL position is unknown relative to backfill, so we refuse and point the operator at –resume or ‘pcrd cleanup`.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pcrd/schema/setup.rb', line 27

def create_publication_and_slot(pub_name:, slot_name:)
  ensure_publication(pub_name)

  if slot_exists?(slot_name)
    raise SetupError, "Replication slot '#{slot_name}' already exists. Resume the existing " \
                      "migration with --resume, or remove it with `pcrd cleanup` to start over."
  end

  result = @source_pool.exec(
    "SELECT lsn FROM pg_create_logical_replication_slot($1, 'pgoutput')",
    [slot_name]
  )
  result[0]["lsn"]
end

#create_target_tables(force_overwrite: false) ⇒ Object

Creates all target tables and returns a Hash<table_name, ddl_string>. Raises if a target table already exists (use –force-overwrite to drop first).



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/pcrd/schema/setup.rb', line 70

def create_target_tables(force_overwrite: false)
  reader = Reader.new(@source_pool)
  ddls   = {}

  @config.migrate.tables.each do |table_config|
    name        = table_config.name
    source_cols = reader.read(name)
    pk_cols     = reader.primary_key_columns(name)

    ddl = DDL.generate(
      source_columns:      source_cols,
      table_config:        table_config,
      primary_key_columns: pk_cols
    )

    target_reader = Reader.new(@target_pool)
    if target_reader.table_exists?(name)
      if force_overwrite
        @target_pool.exec_sql("DROP TABLE IF EXISTS #{Sql.quote_table(name)} CASCADE")
      else
        raise SetupError, "Table '#{name}' already exists on target. " \
                          "Pass --force-overwrite to drop and recreate."
      end
    end

    @target_pool.exec_sql("#{ddl};")
    ddls[name] = ddl
  end

  ddls
end

#drop_publication_and_slot(pub_name:, slot_name:) ⇒ Object

Drops the publication and replication slot (cleanup phase).



57
58
59
60
61
62
63
64
65
66
# File 'lib/pcrd/schema/setup.rb', line 57

def drop_publication_and_slot(pub_name:, slot_name:)
  @source_pool.exec_sql(
    "DROP PUBLICATION IF EXISTS #{@source_pool.quote_ident(pub_name)}"
  )
  @source_pool.exec(
    "SELECT pg_drop_replication_slot($1) WHERE EXISTS (" \
    "  SELECT 1 FROM pg_replication_slots WHERE slot_name = $1)",
    [slot_name]
  )
end

#validate_resumable!(pub_name:, slot_name:) ⇒ Object

Validates that a –resume run has the slot and publication it needs. Raises with a clear message if either is missing.



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pcrd/schema/setup.rb', line 44

def validate_resumable!(pub_name:, slot_name:)
  unless slot_exists?(slot_name)
    raise SetupError, "Cannot resume: replication slot '#{slot_name}' does not exist on the source. " \
                      "Start a fresh migration (without --resume)."
  end

  unless publication_exists?(pub_name)
    raise SetupError, "Cannot resume: publication '#{pub_name}' does not exist on the source. " \
                      "Start a fresh migration (without --resume)."
  end
end