Module: AcidicJob::Awaiting
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/acidic_job/awaiting.rb
Instance Method Summary collapse
Instance Method Details
#enqueue_step_parallel_jobs(jobs, run, step_result) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/acidic_job/awaiting.rb', line 22 def enqueue_step_parallel_jobs(jobs, run, step_result) # `batch` is available from Sidekiq::Pro raise SidekiqBatchRequired unless defined?(Sidekiq::Batch) batch.jobs do step_batch = Sidekiq::Batch.new # step_batch.description = "AcidicJob::Workflow Step: #{step}" step_batch.on( :success, "#{self.class.name}#step_done", # NOTE: options are marshalled through JSON so use only basic types. { "run_id" => run.id, "step_result_yaml" => step_result.to_yaml.strip } ) # NOTE: The jobs method is atomic. # All jobs created in the block are actually pushed atomically at the end of the block. # If an error is raised, none of the jobs will go to Redis. step_batch.jobs do jobs.each do |worker_name| # TODO: handle Symbols as well worker = worker_name.is_a?(String) ? worker_name.constantize : worker_name if worker.instance_method(:perform).arity.zero? worker.perform_async elsif worker.instance_method(:perform).arity == 1 worker.perform_async(run.id) else raise TooManyParametersForParallelJob end end end end end |
#step_done(_status, options) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/acidic_job/awaiting.rb', line 55 def step_done(_status, ) run = Run.find(["run_id"]) current_step = run.workflow[run.recovery_point.to_s] # re-hydrate the `step_result` object step_result = YAML.safe_load(["step_result_yaml"], permitted_classes: [RecoveryPoint, FinishedPoint]) step = Step.new(current_step, run, self, step_result) # TODO: WRITE REGRESSION TESTS FOR PARALLEL JOB FAILING AND RETRYING THE ORIGINAL STEP step.progress # when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again process_run(run) end |