Class: DataDrain::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/data_drain/engine.rb

Overview

Motor principal de extracción y purga de datos (DataDrain).

Orquesta el flujo ETL desde PostgreSQL hacia un Data Lake analítico delegando la interacción del almacenamiento al adaptador configurado.

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Engine

Inicializa una nueva instancia del motor de extracción.

Parameters:

  • options (Hash)

    Diccionario de configuración para la extracción.

Options Hash (options):

  • :start_date (Time, DateTime, Date)

    Fecha y hora de inicio.

  • :end_date (Time, DateTime, Date)

    Fecha y hora de fin.

  • :table_name (String)

    Nombre de la tabla en PostgreSQL.

  • :folder_name (String) — default: Opcional

    Nombre de la carpeta destino.

  • :select_sql (String) — default: Opcional

    Sentencia SELECT personalizada.

  • :partition_keys (Array<String, Symbol>)

    Columnas para particionar.

  • :primary_key (String) — default: Opcional

    Clave primaria para borrado. Por defecto ‘id’.

  • :where_clause (String) — default: Opcional

    Condición SQL extra.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/data_drain/engine.rb', line 23

def initialize(options)
  @start_date     = options.fetch(:start_date).beginning_of_day
  @end_date       = options.fetch(:end_date).end_of_day
  @table_name     = options.fetch(:table_name)
  @folder_name    = options.fetch(:folder_name, @table_name)
  @select_sql     = options.fetch(:select_sql, "*")
  @partition_keys = options.fetch(:partition_keys)
  @primary_key    = options.fetch(:primary_key, "id")
  @where_clause   = options[:where_clause]
  @bucket         = options[:bucket]

  @config  = DataDrain.configuration
  @logger  = @config.logger
  @adapter = DataDrain::Storage.adapter

  database = DuckDB::Database.open(":memory:")
  @duckdb  = database.connect
end

Instance Method Details

#callBoolean

Ejecuta el flujo completo del motor: Setup, Conteo, Exportación, Verificación y Purga.

Returns:

  • (Boolean)

    ‘true` si el proceso finalizó con éxito, `false` si falló la integridad.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/data_drain/engine.rb', line 45

def call
  @logger.info "[DataDrain Engine] 🚀 Preparando '#{@table_name}' (#{@start_date.to_date} a #{@end_date.to_date})..."

  setup_duckdb

  @pg_count = get_postgres_count

  if @pg_count.zero?
    @logger.info "[DataDrain Engine] ⏭️ No hay registros que cumplan las condiciones."
    return true
  end

  @logger.info "[DataDrain Engine] 📦 Exportando #{@pg_count} registros a Parquet..."
  export_to_parquet

  if verify_integrity
    purge_from_postgres
    @logger.info "[DataDrain Engine] ✅ Proceso completado exitosamente para '#{@table_name}'."
    true
  else
    @logger.error "[DataDrain Engine] ❌ ERROR de integridad en '#{@table_name}'. Abortando purga."
    false
  end
end