Class: DataDrain::FileIngestor

Inherits:
Object
  • Object
show all
Includes:
Observability
Defined in:
lib/data_drain/file_ingestor.rb

Overview

Clase encargada de ingerir archivos locales (CSV, JSON, Parquet) generados por otros servicios (ej. Netflow) y subirlos al Data Lake aplicando compresión ZSTD y particionamiento Hive.

Constant Summary

Constants included from Observability

Observability::SENSITIVE_KEY_PATTERN

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ FileIngestor

Returns a new instance of FileIngestor.

Parameters:

  • options (Hash)

    Opciones de ingestión.

Options Hash (options):

  • :source_path (String)

    Ruta absoluta al archivo local.

  • :folder_name (String)

    Nombre de la carpeta destino en el Data Lake.

  • :partition_keys (Array<String, Symbol>) — default: Opcional

    Columnas para particionar.

  • :select_sql (String) — default: Opcional

    Sentencia SELECT para transformar datos al vuelo.

  • :delete_after_upload (Boolean) — default: Opcional

    Borra el archivo local al terminar. Por defecto true.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/data_drain/file_ingestor.rb', line 18

def initialize(options)
  @source_path = options.fetch(:source_path)
  @folder_name = options.fetch(:folder_name)
  Validations.validate_identifier!(:folder_name, @folder_name)
  @partition_keys = options.fetch(:partition_keys, [])
  @select_sql = options.fetch(:select_sql, "*")
  @delete_after_upload = options.fetch(:delete_after_upload, true)
  @bucket = options[:bucket]

  @config = DataDrain.configuration
  @config.validate!
  @logger = @config.logger
  @adapter = DataDrain::Storage.adapter

  database = DuckDB::Database.open(":memory:")
  @duckdb = database.connect
end

Instance Method Details

#callBoolean

Ejecuta el flujo de ingestión.

Returns:

  • (Boolean)

    true si el proceso fue exitoso.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/data_drain/file_ingestor.rb', line 38

def call
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  safe_log(:info, "file_ingestor.start", { source_path: @source_path })

  unless File.exist?(@source_path)
    safe_log(:error, "file_ingestor.file_not_found", { source_path: @source_path })
    return false
  end

  @duckdb.query("SET max_memory='#{@config.limit_ram}';") if @config.limit_ram.present?
  @duckdb.query("SET temp_directory='#{@config.tmp_directory}'") if @config.tmp_directory.present?

  @adapter.setup_duckdb(@duckdb)

  # Determinamos la función lectora de DuckDB según la extensión del archivo
  reader_function = determine_reader

  # 1. Conteo de seguridad
  step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  source_count = @duckdb.query("SELECT COUNT(*) FROM #{reader_function}").first.first
  source_query_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start
  safe_log(:info, "file_ingestor.count", {
             source_path: @source_path,
             count: source_count,
             source_query_duration_s: source_query_duration.round(2)
           })

  if source_count.zero?
    cleanup_local_file
    duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
    safe_log(:info, "file_ingestor.skip_empty", { source_path: @source_path, duration_s: duration.round(2) })
    return true
  end

  # 2. Exportación / Subida
  @adapter.prepare_export_path(@bucket, @folder_name)
  dest_path = if @config.storage_mode.to_sym == :s3
                "s3://#{@bucket}/#{@folder_name}/"
              else
                File.join(@bucket,
                          @folder_name, "")
              end

  partition_clause = @partition_keys.any? ? "PARTITION_BY (#{@partition_keys.join(", ")})," : ""

  query = <<~SQL
    COPY (
      SELECT #{@select_sql}
      FROM #{reader_function}
    ) TO '#{dest_path}'
    (
      FORMAT PARQUET,
      #{partition_clause}
      COMPRESSION 'ZSTD',
      OVERWRITE_OR_IGNORE 1
    );
  SQL

  safe_log(:info, "file_ingestor.export_start", { dest_path: dest_path })
  step_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @duckdb.query(query)
  export_duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - step_start

  duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
  safe_log(:info, "file_ingestor.complete", {
             source_path: @source_path,
             duration_s: duration.round(2),
             source_query_duration_s: source_query_duration.round(2),
             export_duration_s: export_duration.round(2),
             count: source_count
           })

  cleanup_local_file
  true
rescue DuckDB::Error => e
  duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
  safe_log(:error, "file_ingestor.duckdb_error",
           { source_path: @source_path }.merge((e)).merge(duration_s: duration.round(2)))
  false
ensure
  @duckdb&.close
end