Class: DataDrain::Engine

Inherits:
Object
  • Object
show all
Includes:
Observability, Observability::Timing
Defined in:
lib/data_drain/engine.rb

Overview

Motor principal de extracción y purga de datos (DataDrain).

Orquesta el flujo ETL desde PostgreSQL hacia un Data Lake analítico delegando la interacción del almacenamiento al adaptador configurado.

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Engine

Inicializa una nueva instancia del motor de extracción.

Parameters:

  • options (Hash)

    Diccionario de configuración para la extracción.

Options Hash (options):

  • :start_date (Time, DateTime, Date)

    Fecha y hora de inicio.

  • :end_date (Time, DateTime, Date)

    Fecha y hora de fin.

  • :table_name (String)

    Nombre de la tabla en PostgreSQL.

  • :folder_name (String) — default: Opcional

    Nombre de la carpeta destino.

  • :select_sql (String) — default: Opcional

    Sentencia SELECT personalizada.

  • :partition_keys (Array<String, Symbol>)

    Columnas para particionar.

  • :primary_key (String) — default: Opcional

    Clave primaria para borrado. Por defecto ‘id’.

  • :where_clause (String) — default: Opcional

    Condición SQL extra.

  • :skip_export (Boolean) — default: Opcional

    Si true, no exporta a Parquet — solo valida y purga (para uso con GlueRunner).



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/data_drain/engine.rb', line 27

def initialize(options)
  @start_date = options.fetch(:start_date).beginning_of_day

  @end_date = options.fetch(:end_date).to_date.next_day.beginning_of_day

  @table_name = options.fetch(:table_name)
  Validations.validate_identifier!(:table_name, @table_name)

  @folder_name = options.fetch(:folder_name, @table_name)
  @select_sql = options.fetch(:select_sql, "*")
  @partition_keys = options.fetch(:partition_keys)
  @primary_key = options.fetch(:primary_key, "id")
  Validations.validate_identifier!(:primary_key, @primary_key)
  @where_clause = options[:where_clause]
  @bucket = options[:bucket]
  @skip_export = options.fetch(:skip_export, false)

  @config = DataDrain.configuration
  @config.validate_for_engine!
  @logger = @config.logger
  @adapter = DataDrain::Storage.adapter

  database = DuckDB::Database.open(":memory:")
  @duckdb = database.connect
end

Instance Method Details

#callBoolean

Returns true si el flujo completó exitosamente, false si falló.

Returns:

  • (Boolean)

    true si el flujo completó exitosamente, false si falló



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/data_drain/engine.rb', line 54

def call
  @durations = {}
  start_time = monotonic
  log_start

  setup_duckdb
  return skip_empty(start_time) if step_count.zero?

  if @skip_export
    safe_log(:info, "engine.skip_export", { table: @table_name })
  else
    step_export
  end
  return integrity_failed(start_time) unless step_verify

  step_purge
  log_complete(start_time)
  true
end