Class: DataDrain::GlueRunner

Inherits:
Object
  • Object
show all
Extended by:
Observability
Defined in:
lib/data_drain/glue_runner.rb

Overview

Orquestador para AWS Glue. Permite disparar y monitorear Jobs en AWS para delegar el movimiento masivo de datos (ej. tablas de 1TB).

Class Method Summary collapse

Class Method Details

.run_and_wait(job_name, arguments = {}, polling_interval: 30) ⇒ Boolean

Dispara un Job de Glue y espera a que termine exitosamente.

Parameters:

  • job_name (String)

    Nombre del Job en la consola de AWS.

  • arguments (Hash) (defaults to: {})

    Argumentos de ejecución (deben empezar con –).

  • polling_interval (Integer) (defaults to: 30)

    Segundos de espera entre cada chequeo de estado.

Returns:

  • (Boolean)

    true si el Job terminó exitosamente (SUCCEEDED).

Raises:

  • (RuntimeError)

    Si el Job falla o se detiene.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/data_drain/glue_runner.rb', line 19

def self.run_and_wait(job_name, arguments = {}, polling_interval: 30)
  config = DataDrain.configuration
  client = Aws::Glue::Client.new(region: config.aws_region)
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  # Usamos el logger de la configuración directamente para el primer log antes de instanciar safe_log si fuera necesario
  # Pero como extendemos Observability, usamos safe_log directamente.
  @logger = config.logger 

  safe_log(:info, "glue_runner.start", { job: job_name })
  resp = client.start_job_run(job_name: job_name, arguments: arguments)
  run_id = resp.job_run_id

  loop do
    run_info = client.get_job_run(job_name: job_name, run_id: run_id).job_run
    status = run_info.job_run_state

    case status
    when "SUCCEEDED"
      duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
      safe_log(:info, "glue_runner.complete", { job: job_name, run_id: run_id, duration_s: duration.round(2) })
      return true
    when "FAILED", "STOPPED", "TIMEOUT"
      duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
       = { job: job_name, run_id: run_id, status: status, duration_s: duration.round(2) }
      
      if run_info.error_message
        [:error_message] = run_info.error_message.gsub("\"", "'")[0, 200]
      end

      safe_log(:error, "glue_runner.failed", )
      raise "Glue Job #{job_name} (Run ID: #{run_id}) falló con estado #{status}."
    else
      safe_log(:info, "glue_runner.polling", { job: job_name, run_id: run_id, status: status, next_check_in_s: polling_interval })
      sleep polling_interval
    end
  end
end