Class: DataDrain::Engine

Inherits:
Object
  • Object
show all
Includes:
Observability, Observability::Timing
Defined in:
lib/data_drain/engine.rb

Overview

Motor principal de extracción y purga de datos (DataDrain).

Orquesta el flujo ETL desde PostgreSQL hacia un Data Lake analítico delegando la interacción del almacenamiento al adaptador configurado.

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Engine

Inicializa una nueva instancia del motor de extracción.

Parameters:

  • options (Hash)

    Diccionario de configuración para la extracción.

Options Hash (options):

  • :start_date (Time, DateTime, Date)

    Fecha y hora de inicio.

  • :end_date (Time, DateTime, Date)

    Fecha y hora de fin.

  • :table_name (String)

    Nombre de la tabla en PostgreSQL.

  • :folder_name (String) — default: Opcional

    Nombre de la carpeta destino.

  • :select_sql (String) — default: Opcional

    Sentencia SELECT personalizada.

  • :partition_keys (Array<String, Symbol>)

    Columnas para particionar.

  • :primary_key (String) — default: Opcional

    Clave primaria para borrado. Por defecto ‘id’.

  • :where_clause (String) — default: Opcional

    Condición SQL extra que filtra export, count e integrity check. Define “qué se archiva”.

  • :purge_where_clause (String) — default: Opcional

    Condición SQL para el DELETE. Si se omite, usa :where_clause (backwards compatible). Pasar nil explícito para desactivar purga. Pasar ” (vacío) para purgar todo el rango de fechas sin filtro adicional (útil para archivar subset y borrar superset). Puede ser más amplia que :where_clause; filas que matchean :purge_where_clause pero no :where_clause se borran sin archivar ni verificar. Útil para limpieza de orphans/trash que no debe respaldarse.

  • :skip_export (Boolean) — default: Opcional

    Si true, no exporta a Parquet — solo valida y purga (para uso con GlueRunner).



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/data_drain/engine.rb', line 36

def initialize(options)
  @start_date = options.fetch(:start_date).beginning_of_day

  @end_date = options.fetch(:end_date).to_date.next_day.beginning_of_day

  @table_name = options.fetch(:table_name)
  Validations.validate_identifier!(:table_name, @table_name)

  @folder_name = options.fetch(:folder_name, @table_name)
  @select_sql = options.fetch(:select_sql, "*")
  @partition_keys = options.fetch(:partition_keys)
  @primary_key = options.fetch(:primary_key, "id")
  Validations.validate_identifier!(:primary_key, @primary_key)
  @where_clause = options[:where_clause]
  @purge_where_clause = options.fetch(:purge_where_clause, @where_clause)
  @bucket = options[:bucket]
  @skip_export = options.fetch(:skip_export, false)

  @config = DataDrain.configuration
  @config.validate_for_engine!
  @logger = @config.logger
  @adapter = DataDrain::Storage.adapter

  database = DuckDB::Database.open(":memory:")
  @duckdb = database.connect
end

Instance Method Details

#callBoolean

Returns true si el flujo completó exitosamente, false si falló.

Returns:

  • (Boolean)

    true si el flujo completó exitosamente, false si falló



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/data_drain/engine.rb', line 64

def call
  @durations = {}
  start_time = monotonic
  log_start

  setup_duckdb
  return skip_empty(start_time) if step_count.zero?

  if @skip_export
    safe_log(:info, "engine.skip_export", { table: @table_name })
  else
    step_export
  end
  return integrity_failed(start_time) unless step_verify

  step_purge
  log_complete(start_time)
  true
end