Class: Pcrd::Apply::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/apply/engine.rb

Overview

Applies transactions from the WAL consumer queue to the target cluster.

Each transaction is a Replication::Consumer::Transaction containing a list of Insert/Update/Delete events. Events for tables not in the migration spec are silently skipped (other tables may be in the publication).

INSERT events use ON CONFLICT DO UPDATE so they are safe during the backfill/streaming overlap window — if backfill already wrote a row, the WAL replay will update it to the latest version instead of failing.

UPDATE events are implemented as upserts (same SQL as INSERT) because:

- The row may not yet exist on the target (if the WAL event precedes
  the backfill batch that covers that key range).
- Upsert semantics are always correct here.

DELETE events use the primary-key values from old_tuple (key columns).

Defined Under Namespace

Classes: TablePlan

Instance Method Summary collapse

Constructor Details

#initialize(target_pool:, config:, parser:, source_schema:) ⇒ Engine

Returns a new instance of Engine.



32
33
34
35
36
# File 'lib/pcrd/apply/engine.rb', line 32

def initialize(target_pool:, config:, parser:, source_schema:)
  @target_pool   = target_pool
  @parser        = parser
  @plans         = build_plans(config, source_schema)
end

Instance Method Details

#apply(txn) ⇒ Object

Applies one complete transaction to the target inside a single DB transaction. Returns the commit LSN string.



40
41
42
43
44
45
# File 'lib/pcrd/apply/engine.rb', line 40

def apply(txn)
  @target_pool.transaction do
    txn.events.each { |event| apply_event(event) }
  end
  txn.commit_lsn
end