Class: DataDrain::GlueRunner
- Inherits:
-
Object
- Object
- DataDrain::GlueRunner
- Extended by:
- Observability
- Defined in:
- lib/data_drain/glue_runner.rb
Overview
Orquestador para AWS Glue. Permite disparar y monitorear Jobs en AWS para delegar el movimiento masivo de datos (ej. tablas de 1TB).
Constant Summary
Constants included from Observability
Observability::SENSITIVE_KEY_PATTERN
Class Method Summary collapse
-
.run_and_wait(job_name, arguments = {}, polling_interval: 30, max_wait_seconds: nil) ⇒ Boolean
Dispara un Job de Glue y espera a que termine exitosamente.
Class Method Details
.run_and_wait(job_name, arguments = {}, polling_interval: 30, max_wait_seconds: nil) ⇒ Boolean
Dispara un Job de Glue y espera a que termine exitosamente.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/data_drain/glue_runner.rb', line 22 def self.run_and_wait(job_name, arguments = {}, polling_interval: 30, max_wait_seconds: nil) config = DataDrain.configuration config.validate! client = Aws::Glue::Client.new(region: config.aws_region) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @logger = config.logger safe_log(:info, "glue_runner.start", { job: job_name }) resp = client.start_job_run(job_name: job_name, arguments: arguments) run_id = resp.job_run_id loop do if max_wait_seconds && (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time) > max_wait_seconds safe_log(:error, "glue_runner.timeout", { job: job_name, run_id: run_id, max_wait_seconds: max_wait_seconds }) raise DataDrain::Error, "Glue Job #{job_name} (Run ID: #{run_id}) excedió max_wait_seconds=#{max_wait_seconds}" end run_info = client.get_job_run(job_name: job_name, run_id: run_id).job_run status = run_info.job_run_state case status when "SUCCEEDED" duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time safe_log(:info, "glue_runner.complete", { job: job_name, run_id: run_id, duration_s: duration.round(2) }) return true when "FAILED", "STOPPED", "TIMEOUT" duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time = { job: job_name, run_id: run_id, status: status, duration_s: duration.round(2) } [:error_message] = run_info..gsub("\"", "'")[0, 200] if run_info. safe_log(:error, "glue_runner.failed", ) raise "Glue Job #{job_name} (Run ID: #{run_id}) falló con estado #{status}." else safe_log(:info, "glue_runner.polling", { job: job_name, run_id: run_id, status: status, next_check_in_s: polling_interval }) sleep polling_interval end end end |