Class: DataDrain::FileIngestor
- Inherits:
-
Object
- Object
- DataDrain::FileIngestor
- Includes:
- Observability
- Defined in:
- lib/data_drain/file_ingestor.rb
Overview
Clase encargada de ingerir archivos locales (CSV, JSON, Parquet) generados por otros servicios (ej. Netflow) y subirlos al Data Lake aplicando compresión ZSTD y particionamiento Hive.
Constant Summary
Constants included from Observability
Observability::SENSITIVE_KEY_PATTERN
Instance Method Summary collapse
-
#call ⇒ Boolean
Ejecuta el flujo de ingestión.
-
#initialize(options) ⇒ FileIngestor
constructor
A new instance of FileIngestor.
Constructor Details
#initialize(options) ⇒ FileIngestor
Returns a new instance of FileIngestor.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/data_drain/file_ingestor.rb', line 18 def initialize() @source_path = .fetch(:source_path) @folder_name = .fetch(:folder_name) Validations.validate_identifier!(:folder_name, @folder_name) @partition_keys = .fetch(:partition_keys, []) @select_sql = .fetch(:select_sql, "*") @delete_after_upload = .fetch(:delete_after_upload, true) @bucket = [:bucket] @config = DataDrain.configuration @config.validate! @logger = @config.logger @adapter = DataDrain::Storage.adapter database = DuckDB::Database.open(":memory:") @duckdb = database.connect end |
Instance Method Details
#call ⇒ Boolean
Ejecuta el flujo de ingestión.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/data_drain/file_ingestor.rb', line 38 def call start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) safe_log(:info, "file_ingestor.start", { source_path: @source_path }) unless File.exist?(@source_path) safe_log(:error, "file_ingestor.file_not_found", { source_path: @source_path }) return false end @duckdb.query("SET max_memory='#{@config.limit_ram}';") if @config.limit_ram.present? @duckdb.query("SET temp_directory='#{@config.tmp_directory}'") if @config.tmp_directory.present? @adapter.setup_duckdb(@duckdb) # Determinamos la función lectora de DuckDB según la extensión del archivo reader_function = determine_reader # 1. Conteo de seguridad step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) source_count = @duckdb.query("SELECT COUNT(*) FROM #{reader_function}").first.first source_query_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start safe_log(:info, "file_ingestor.count", { source_path: @source_path, count: source_count, source_query_duration_s: source_query_duration.round(2) }) if source_count.zero? cleanup_local_file duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time safe_log(:info, "file_ingestor.skip_empty", { source_path: @source_path, duration_s: duration.round(2) }) return true end # 2. Exportación / Subida @adapter.prepare_export_path(@bucket, @folder_name) dest_path = if @config.storage_mode.to_sym == :s3 "s3://#{@bucket}/#{@folder_name}/" else File.join(@bucket, @folder_name, "") end partition_clause = @partition_keys.any? ? "PARTITION_BY (#{@partition_keys.join(", ")})," : "" query = <<~SQL COPY ( SELECT #{@select_sql} FROM #{reader_function} ) TO '#{dest_path}' ( FORMAT PARQUET, #{partition_clause} COMPRESSION 'ZSTD', OVERWRITE_OR_IGNORE 1 ); SQL safe_log(:info, "file_ingestor.export_start", { dest_path: dest_path }) step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) @duckdb.query(query) export_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time safe_log(:info, "file_ingestor.complete", { source_path: @source_path, duration_s: duration.round(2), source_query_duration_s: source_query_duration.round(2), export_duration_s: export_duration.round(2), count: source_count }) cleanup_local_file true rescue DuckDB::Error => e duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time safe_log(:error, "file_ingestor.duckdb_error", { source_path: @source_path }.merge((e)).merge(duration_s: duration.round(2))) false ensure @duckdb&.close end |