Module: AcidicJob::Awaiting

Extended by:
ActiveSupport::Concern
Defined in:
lib/acidic_job/awaiting.rb

Instance Method Summary collapse

Instance Method Details

#enqueue_step_parallel_jobs(jobs, run, step_result) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/acidic_job/awaiting.rb', line 9

def enqueue_step_parallel_jobs(jobs, run, step_result)
  # `batch` is available from Sidekiq::Pro
  raise SidekiqBatchRequired unless defined?(Sidekiq::Batch)

  step_batch = Sidekiq::Batch.new
  # step_batch.description = "AcidicJob::Workflow Step: #{step}"
  step_batch.on(
    :success,
    "#{self.class.name}#step_done",
    # NOTE: options are marshalled through JSON so use only basic types.
    { "run_id" => run.id,
      "step_result_yaml" => step_result.to_yaml.strip }
  )
  # NOTE: The jobs method is atomic.
  # All jobs created in the block are actually pushed atomically at the end of the block.
  # If an error is raised, none of the jobs will go to Redis.
  step_batch.jobs do
    jobs.each do |worker_name|
      # TODO: handle Symbols as well
      worker = worker_name.is_a?(String) ? worker_name.constantize : worker_name
      if worker.instance_method(:perform).arity.zero?
        worker.perform_async
      elsif worker.instance_method(:perform).arity == 1
        worker.perform_async(run.id)
      else
        raise TooManyParametersForParallelJob
      end
    end
  end
end

#step_done(_status, options) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/acidic_job/awaiting.rb', line 40

def step_done(_status, options)
  run = Run.find(options["run_id"])
  current_step = run.workflow[run.recovery_point.to_s]
  # re-hydrate the `step_result` object
  step_result = YAML.safe_load(options["step_result_yaml"], permitted_classes: [RecoveryPoint, FinishedPoint])
  step = Step.new(current_step, run, self, step_result)

  # TODO: WRITE REGRESSION TESTS FOR PARALLEL JOB FAILING AND RETRYING THE ORIGINAL STEP
  step.progress
  # when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again
  process_run(run)
end