Class: DataDrain::FileIngestor
- Inherits:
-
Object
- Object
- DataDrain::FileIngestor
- Includes:
- Observability, Observability::Timing
- Defined in:
- lib/data_drain/file_ingestor.rb
Overview
Clase encargada de ingerir archivos locales (CSV, JSON, Parquet) generados por otros servicios (ej. Netflow) y subirlos al Data Lake aplicando compresión ZSTD y particionamiento Hive.
Constant Summary
Constants included from Observability
Observability::SENSITIVE_KEY_PATTERN
Instance Method Summary collapse
-
#call ⇒ Boolean
Ejecuta el flujo de ingestión.
-
#initialize(options) ⇒ FileIngestor
constructor
A new instance of FileIngestor.
Constructor Details
#initialize(options) ⇒ FileIngestor
Returns a new instance of FileIngestor.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/data_drain/file_ingestor.rb', line 17 def initialize() @source_path = .fetch(:source_path) @folder_name = .fetch(:folder_name) Validations.validate_identifier!(:folder_name, @folder_name) @partition_keys = .fetch(:partition_keys, []) @select_sql = .fetch(:select_sql, "*") @delete_after_upload = .fetch(:delete_after_upload, true) @bucket = [:bucket] @config = DataDrain.configuration @config.validate! @logger = @config.logger @adapter = DataDrain::Storage.adapter database = DuckDB::Database.open(":memory:") @duckdb = database.connect end |
Instance Method Details
#call ⇒ Boolean
Ejecuta el flujo de ingestión.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/data_drain/file_ingestor.rb', line 37 def call @durations = {} start_time = monotonic safe_log(:info, "file_ingestor.start", { source_path: @source_path }) return file_not_found(start_time) unless step_validate_file step_setup_duckdb @reader_function = determine_reader @source_count = step_count_source return skip_empty(start_time) if @source_count.zero? step_export log_complete(start_time) cleanup_local_file true rescue DuckDB::Error => e duration = monotonic - start_time safe_log(:error, "file_ingestor.duckdb_error", { source_path: @source_path }.merge((e)).merge(duration_s: duration.round(2))) false ensure @duckdb&.close end |