Class: DataDrain::Engine

Inherits:
Object
  • Object
show all
Includes:
Observability
Defined in:
lib/data_drain/engine.rb

Overview

Motor principal de extracción y purga de datos (DataDrain). rubocop:disable Metrics/ClassLength, Metrics/AbcSize, Metrics/MethodLength, Naming/AccessorMethodName

Orquesta el flujo ETL desde PostgreSQL hacia un Data Lake analítico delegando la interacción del almacenamiento al adaptador configurado.

Constant Summary

Constants included from Observability

Observability::SENSITIVE_KEY_PATTERN

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Engine

Inicializa una nueva instancia del motor de extracción.

Parameters:

  • options (Hash)

    Diccionario de configuración para la extracción.

Options Hash (options):

  • :start_date (Time, DateTime, Date)

    Fecha y hora de inicio.

  • :end_date (Time, DateTime, Date)

    Fecha y hora de fin.

  • :table_name (String)

    Nombre de la tabla en PostgreSQL.

  • :folder_name (String) — default: Opcional

    Nombre de la carpeta destino.

  • :select_sql (String) — default: Opcional

    Sentencia SELECT personalizada.

  • :partition_keys (Array<String, Symbol>)

    Columnas para particionar.

  • :primary_key (String) — default: Opcional

    Clave primaria para borrado. Por defecto ‘id’.

  • :where_clause (String) — default: Opcional

    Condición SQL extra.

  • :skip_export (Boolean) — default: Opcional

    Si true, no exporta a Parquet — solo valida y purga (para uso con GlueRunner).



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/data_drain/engine.rb', line 27

def initialize(options)
  @start_date = options.fetch(:start_date).beginning_of_day

  @end_date = options.fetch(:end_date).to_date.next_day.beginning_of_day

  @table_name = options.fetch(:table_name)
  Validations.validate_identifier!(:table_name, @table_name)

  @folder_name = options.fetch(:folder_name, @table_name)
  @select_sql = options.fetch(:select_sql, "*")
  @partition_keys = options.fetch(:partition_keys)
  @primary_key = options.fetch(:primary_key, "id")
  Validations.validate_identifier!(:primary_key, @primary_key)
  @where_clause = options[:where_clause]
  @bucket = options[:bucket]
  @skip_export = options.fetch(:skip_export, false)

  @config = DataDrain.configuration
  @config.validate_for_engine!
  @logger = @config.logger
  @adapter = DataDrain::Storage.adapter

  database = DuckDB::Database.open(":memory:")
  @duckdb = database.connect
end

Instance Method Details

#callBoolean

Ejecuta el flujo completo del motor: Setup, Conteo, Exportación (opcional), Verificación y Purga.

Returns:

  • (Boolean)

    ‘true` si el proceso finalizó con éxito, `false` si falló la integridad.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/data_drain/engine.rb', line 56

def call
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  safe_log(:info, "engine.start",
           { table: @table_name, start_date: @start_date.to_date, end_date: @end_date.to_date })

  setup_duckdb

  # 1. Conteo inicial en Postgres
  step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @pg_count = get_postgres_count
  db_query_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start

  if @pg_count.zero?
    duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
    safe_log(:info, "engine.skip_empty",
             { table: @table_name, duration_s: duration.round(2), db_query_duration_s: db_query_duration.round(2) })
    return true
  end

  # 2. Exportación
  export_duration = 0.0
  if @skip_export
    safe_log(:info, "engine.skip_export", { table: @table_name })
  else
    safe_log(:info, "engine.export_start", { table: @table_name, count: @pg_count })
    step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    export_to_parquet
    export_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start
  end

  # 3. Verificación de Integridad
  step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  integrity_ok = verify_integrity
  integrity_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start

  if integrity_ok
    # 4. Purga en Postgres
    step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    purge_from_postgres
    purge_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start

    duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
    safe_log(:info, "engine.complete", {
               table: @table_name,
               duration_s: duration.round(2),
               db_query_duration_s: db_query_duration.round(2),
               export_duration_s: export_duration.round(2),
               integrity_duration_s: integrity_duration.round(2),
               purge_duration_s: purge_duration.round(2),
               count: @pg_count
             })
    true
  else
    duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
    safe_log(:error, "engine.integrity_error",
             { table: @table_name, duration_s: duration.round(2), count: @pg_count })
    false
  end
end