Module: AcidicJob::Awaiting
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/acidic_job/awaiting.rb
Instance Method Summary collapse
Instance Method Details
#enqueue_step_parallel_jobs(jobs, run, step_result) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/acidic_job/awaiting.rb', line 9 def enqueue_step_parallel_jobs(jobs, run, step_result) # `batch` is available from Sidekiq::Pro raise SidekiqBatchRequired unless defined?(Sidekiq::Batch) step_batch = Sidekiq::Batch.new # step_batch.description = "AcidicJob::Workflow Step: #{step}" step_batch.on( :success, "#{self.class.name}#step_done", # NOTE: options are marshalled through JSON so use only basic types. { "run_id" => run.id, "step_result_yaml" => step_result.to_yaml.strip } ) # NOTE: The jobs method is atomic. # All jobs created in the block are actually pushed atomically at the end of the block. # If an error is raised, none of the jobs will go to Redis. step_batch.jobs do jobs.each do |worker_name| # TODO: handle Symbols as well worker = worker_name.is_a?(String) ? worker_name.constantize : worker_name if worker.instance_method(:perform).arity.zero? worker.perform_async elsif worker.instance_method(:perform).arity == 1 worker.perform_async(run.id) else raise TooManyParametersForParallelJob end end end end |
#step_done(_status, options) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/acidic_job/awaiting.rb', line 40 def step_done(_status, ) run = Run.find(["run_id"]) current_step = run.workflow[run.recovery_point.to_s] # re-hydrate the `step_result` object step_result = YAML.safe_load(["step_result_yaml"], permitted_classes: [RecoveryPoint, FinishedPoint]) step = Step.new(current_step, run, self, step_result) # TODO: WRITE REGRESSION TESTS FOR PARALLEL JOB FAILING AND RETRYING THE ORIGINAL STEP step.progress # when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again process_run(run) end |