Class: DataDrain::GlueRunner
- Inherits:
-
Object
- Object
- DataDrain::GlueRunner
- Extended by:
- Observability
- Defined in:
- lib/data_drain/glue_runner.rb
Overview
Orquestador para AWS Glue. Permite disparar y monitorear Jobs en AWS para delegar el movimiento masivo de datos (ej. tablas de 1TB).
Class Method Summary collapse
-
.run_and_wait(job_name, arguments = {}, polling_interval: 30) ⇒ Boolean
Dispara un Job de Glue y espera a que termine exitosamente.
Class Method Details
.run_and_wait(job_name, arguments = {}, polling_interval: 30) ⇒ Boolean
Dispara un Job de Glue y espera a que termine exitosamente.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/data_drain/glue_runner.rb', line 19 def self.run_and_wait(job_name, arguments = {}, polling_interval: 30) config = DataDrain.configuration client = Aws::Glue::Client.new(region: config.aws_region) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) # Usamos el logger de la configuración directamente para el primer log antes de instanciar safe_log si fuera necesario # Pero como extendemos Observability, usamos safe_log directamente. @logger = config.logger safe_log(:info, "glue_runner.start", { job: job_name }) resp = client.start_job_run(job_name: job_name, arguments: arguments) run_id = resp.job_run_id loop do run_info = client.get_job_run(job_name: job_name, run_id: run_id).job_run status = run_info.job_run_state case status when "SUCCEEDED" duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time safe_log(:info, "glue_runner.complete", { job: job_name, run_id: run_id, duration_s: duration.round(2) }) return true when "FAILED", "STOPPED", "TIMEOUT" duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time = { job: job_name, run_id: run_id, status: status, duration_s: duration.round(2) } if run_info. [:error_message] = run_info..gsub("\"", "'")[0, 200] end safe_log(:error, "glue_runner.failed", ) raise "Glue Job #{job_name} (Run ID: #{run_id}) falló con estado #{status}." else safe_log(:info, "glue_runner.polling", { job: job_name, run_id: run_id, status: status, next_check_in_s: polling_interval }) sleep polling_interval end end end |