Class: Pcrd::Backfill::Batch
- Inherits:
-
Object
- Object
- Pcrd::Backfill::Batch
- Defined in:
- lib/pcrd/backfill/batch.rb
Overview
Executes one backfill batch: SELECT a page of rows from source, transform them, and COPY to target.
Returns a result hash with row_count, duration_ms, start_key, end_key. Returns nil when the source page is empty (signals end-of-table to Engine).
Constant Summary collapse
- NULL_MARKER =
PostgreSQL COPY TEXT format: tab-delimited, N for NULL. Using text format avoids CSV quoting edge cases and is marginally faster.
"\\N"- DELIMITER =
"\t"
Instance Method Summary collapse
-
#execute(after_key:) ⇒ Object
Copies one page starting after ‘after_key`.
-
#initialize(source_pool:, target_pool:, transformer:, table_name:, pk_columns:, batch_size:, schema_name: "public") ⇒ Batch
constructor
A new instance of Batch.
Constructor Details
#initialize(source_pool:, target_pool:, transformer:, table_name:, pk_columns:, batch_size:, schema_name: "public") ⇒ Batch
Returns a new instance of Batch.
16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/pcrd/backfill/batch.rb', line 16 def initialize(source_pool:, target_pool:, transformer:, table_name:, pk_columns:, batch_size:, schema_name: "public") @source_pool = source_pool @target_pool = target_pool @transformer = transformer @table_name = table_name @pk_columns = pk_columns @batch_size = batch_size @schema_name = schema_name @quoted_table = "#{source_pool.quote_ident(schema_name)}.#{source_pool.quote_ident(table_name)}" end |
Instance Method Details
#execute(after_key:) ⇒ Object
Copies one page starting after ‘after_key`. after_key: nil (first page), a scalar, or an Array for composite PKs.
Returns Hash or nil.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/pcrd/backfill/batch.rb', line 32 def execute(after_key:) t0 = monotonic_ms rows = fetch_source_rows(after_key) return nil if rows.empty? transformed = rows.map { |r| @transformer.transform(r) } copy_to_target(transformed) duration_ms = monotonic_ms - t0 { row_count: rows.size, duration_ms: duration_ms, start_key: extract_key(rows.first), end_key: extract_key(rows.last) } end |