Class: DataDrain::FileIngestor

Inherits:
Object
  • Object
show all
Includes:
Observability, Observability::Timing
Defined in:
lib/data_drain/file_ingestor.rb

Overview

Clase encargada de ingerir archivos locales (CSV, JSON, Parquet) generados por otros servicios (ej. Netflow) y subirlos al Data Lake aplicando compresión ZSTD y particionamiento Hive.

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ FileIngestor

Returns a new instance of FileIngestor.

Parameters:

  • options (Hash)

    Opciones de ingestión.

Options Hash (options):

  • :source_path (String)

    Ruta absoluta al archivo local.

  • :folder_name (String)

    Nombre de la carpeta destino en el Data Lake.

  • :partition_keys (Array<String, Symbol>) — default: Opcional

    Columnas para particionar.

  • :select_sql (String) — default: Opcional

    Sentencia SELECT para transformar datos al vuelo.

  • :delete_after_upload (Boolean) — default: Opcional

    Borra el archivo local al terminar. Por defecto true.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/data_drain/file_ingestor.rb', line 17

def initialize(options)
  @source_path = options.fetch(:source_path)
  @folder_name = options.fetch(:folder_name)
  Validations.validate_identifier!(:folder_name, @folder_name)
  @partition_keys = options.fetch(:partition_keys, [])
  @select_sql = options.fetch(:select_sql, "*")
  @delete_after_upload = options.fetch(:delete_after_upload, true)
  @bucket = options[:bucket]

  @config = DataDrain.configuration
  @config.validate!
  @logger = @config.logger
  @adapter = DataDrain::Storage.adapter

  database = DuckDB::Database.open(":memory:")
  @duckdb = database.connect
end

Instance Method Details

#callBoolean

Ejecuta el flujo de ingestión.

Returns:

  • (Boolean)

    true si el proceso fue exitoso.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/data_drain/file_ingestor.rb', line 37

def call
  @durations = {}
  start_time = monotonic
  safe_log(:info, "file_ingestor.start", { source_path: @source_path })

  return file_not_found(start_time) unless step_validate_file

  step_setup_duckdb
  @reader_function = determine_reader
  @source_count = step_count_source

  return skip_empty(start_time) if @source_count.zero?

  step_export
  log_complete(start_time)
  cleanup_local_file
  true
rescue DuckDB::Error => e
  duration = monotonic - start_time
  safe_log(:error, "file_ingestor.duckdb_error",
           { source_path: @source_path }.merge((e)).merge(duration_s: duration.round(2)))
  false
ensure
  @duckdb&.close
end