Class: Pcrd::Backfill::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/backfill/batch.rb

Overview

Executes one backfill batch: SELECT a page of rows from source, transform them, and COPY to target.

Returns a result hash with row_count, duration_ms, start_key, end_key. Returns nil when the source page is empty (signals end-of-table to Engine).

Constant Summary collapse

NULL_MARKER =

PostgreSQL COPY TEXT format: tab-delimited, N for NULL. Using text format avoids CSV quoting edge cases and is marginally faster.

"\\N"
DELIMITER =
"\t"

Instance Method Summary collapse

Constructor Details

#initialize(source_pool:, target_pool:, transformer:, table_name:, pk_columns:, batch_size:, schema_name: "public") ⇒ Batch

Returns a new instance of Batch.



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/pcrd/backfill/batch.rb', line 16

def initialize(source_pool:, target_pool:, transformer:, table_name:,
               pk_columns:, batch_size:, schema_name: "public")
  @source_pool = source_pool
  @target_pool = target_pool
  @transformer  = transformer
  @table_name   = table_name
  @pk_columns   = pk_columns
  @batch_size   = batch_size
  @schema_name  = schema_name
  @quoted_table = "#{source_pool.quote_ident(schema_name)}.#{source_pool.quote_ident(table_name)}"
end

Instance Method Details

#execute(after_key:) ⇒ Object

Copies one page starting after ‘after_key`. after_key: nil (first page), a scalar, or an Array for composite PKs.

Returns Hash or nil.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/pcrd/backfill/batch.rb', line 32

def execute(after_key:)
  t0   = monotonic_ms
  rows = fetch_source_rows(after_key)
  return nil if rows.empty?

  transformed = rows.map { |r| @transformer.transform(r) }
  copy_to_target(transformed)

  duration_ms = monotonic_ms - t0

  {
    row_count:   rows.size,
    duration_ms: duration_ms,
    start_key:   extract_key(rows.first),
    end_key:     extract_key(rows.last)
  }
end