Class: DataDrain::GlueRunner

Inherits:
Object
  • Object
show all
Extended by:
Observability
Defined in:
lib/data_drain/glue_runner.rb

Overview

Orquestador para AWS Glue. Permite disparar y monitorear Jobs en AWS para delegar el movimiento masivo de datos (ej. tablas de 1TB).

Constant Summary

Constants included from Observability

Observability::SENSITIVE_KEY_PATTERN

Class Method Summary collapse

Class Method Details

.run_and_wait(job_name, arguments = {}, polling_interval: 30, max_wait_seconds: nil) ⇒ Boolean

Dispara un Job de Glue y espera a que termine exitosamente.

Parameters:

  • job_name (String)

    Nombre del Job en la consola de AWS.

  • arguments (Hash) (defaults to: {})

    Argumentos de ejecución (deben empezar con –).

  • polling_interval (Integer) (defaults to: 30)

    Segundos de espera entre cada chequeo de estado.

  • max_wait_seconds (Integer, nil) (defaults to: nil)

    Timeout máximo en segundos. nil = sin límite (comportamiento anterior).

Returns:

  • (Boolean)

    true si el Job terminó exitosamente (SUCCEEDED).

Raises:

  • (DataDrain::Error)

    si max_wait_seconds excede antes de SUCCEEDED.

  • (RuntimeError)

    si el Job falla o se detiene.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/data_drain/glue_runner.rb', line 22

def self.run_and_wait(job_name, arguments = {}, polling_interval: 30, max_wait_seconds: nil)
  config = DataDrain.configuration
  config.validate!
  client = Aws::Glue::Client.new(region: config.aws_region)
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  @logger = config.logger

  safe_log(:info, "glue_runner.start", { job: job_name })
  resp = client.start_job_run(job_name: job_name, arguments: arguments)
  run_id = resp.job_run_id

  loop do
    if max_wait_seconds &&
       (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time) > max_wait_seconds
      safe_log(:error, "glue_runner.timeout", {
                 job: job_name,
                 run_id: run_id,
                 max_wait_seconds: max_wait_seconds
               })
      raise DataDrain::Error,
            "Glue Job #{job_name} (Run ID: #{run_id}) excedió max_wait_seconds=#{max_wait_seconds}"
    end

    run_info = client.get_job_run(job_name: job_name, run_id: run_id).job_run
    status = run_info.job_run_state

    case status
    when "SUCCEEDED"
      duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
      safe_log(:info, "glue_runner.complete", { job: job_name, run_id: run_id, duration_s: duration.round(2) })
      return true
    when "FAILED", "STOPPED", "TIMEOUT"
      duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
       = { job: job_name, run_id: run_id, status: status, duration_s: duration.round(2) }

      [:error_message] = run_info.error_message.gsub("\"", "'")[0, 200] if run_info.error_message

      safe_log(:error, "glue_runner.failed", )
      raise "Glue Job #{job_name} (Run ID: #{run_id}) falló con estado #{status}."
    else
      safe_log(:info, "glue_runner.polling",
               { job: job_name, run_id: run_id, status: status, next_check_in_s: polling_interval })
      sleep polling_interval
    end
  end
end