Class: DataDrain::Engine
- Inherits:
-
Object
- Object
- DataDrain::Engine
- Includes:
- Observability
- Defined in:
- lib/data_drain/engine.rb
Overview
Motor principal de extracción y purga de datos (DataDrain). rubocop:disable Metrics/ClassLength, Metrics/AbcSize, Metrics/MethodLength, Naming/AccessorMethodName
Orquesta el flujo ETL desde PostgreSQL hacia un Data Lake analítico delegando la interacción del almacenamiento al adaptador configurado.
Instance Method Summary collapse
-
#call ⇒ Boolean
Ejecuta el flujo completo del motor: Setup, Conteo, Exportación (opcional), Verificación y Purga.
-
#initialize(options) ⇒ Engine
constructor
Inicializa una nueva instancia del motor de extracción.
Constructor Details
#initialize(options) ⇒ Engine
Inicializa una nueva instancia del motor de extracción.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/data_drain/engine.rb', line 27 def initialize() @start_date = .fetch(:start_date).beginning_of_day @end_date = .fetch(:end_date).to_date.next_day.beginning_of_day @table_name = .fetch(:table_name) Validations.validate_identifier!(:table_name, @table_name) @folder_name = .fetch(:folder_name, @table_name) @select_sql = .fetch(:select_sql, "*") @partition_keys = .fetch(:partition_keys) @primary_key = .fetch(:primary_key, "id") Validations.validate_identifier!(:primary_key, @primary_key) @where_clause = [:where_clause] @bucket = [:bucket] @skip_export = .fetch(:skip_export, false) @config = DataDrain.configuration @logger = @config.logger @adapter = DataDrain::Storage.adapter database = DuckDB::Database.open(":memory:") @duckdb = database.connect end |
Instance Method Details
#call ⇒ Boolean
Ejecuta el flujo completo del motor: Setup, Conteo, Exportación (opcional), Verificación y Purga.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/data_drain/engine.rb', line 55 def call start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) safe_log(:info, "engine.start", { table: @table_name, start_date: @start_date.to_date, end_date: @end_date.to_date }) setup_duckdb # 1. Conteo inicial en Postgres step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) @pg_count = get_postgres_count db_query_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start if @pg_count.zero? duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time safe_log(:info, "engine.skip_empty", { table: @table_name, duration_s: duration.round(2), db_query_duration_s: db_query_duration.round(2) }) return true end # 2. Exportación export_duration = 0.0 if @skip_export safe_log(:info, "engine.skip_export", { table: @table_name }) else safe_log(:info, "engine.export_start", { table: @table_name, count: @pg_count }) step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) export_to_parquet export_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start end # 3. Verificación de Integridad step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) integrity_ok = verify_integrity integrity_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start if integrity_ok # 4. Purga en Postgres step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) purge_from_postgres purge_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time safe_log(:info, "engine.complete", { table: @table_name, duration_s: duration.round(2), db_query_duration_s: db_query_duration.round(2), export_duration_s: export_duration.round(2), integrity_duration_s: integrity_duration.round(2), purge_duration_s: purge_duration.round(2), count: @pg_count }) true else duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time safe_log(:error, "engine.integrity_error", { table: @table_name, duration_s: duration.round(2), count: @pg_count }) false end end |