Module: AcidicJob::Awaiting

Extended by:
ActiveSupport::Concern
Defined in:
lib/acidic_job/awaiting.rb

Instance Method Summary collapse

Instance Method Details

#enqueue_step_parallel_jobs(jobs, run, step_result) ⇒ Object

[View source]

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/acidic_job/awaiting.rb', line 22

def enqueue_step_parallel_jobs(jobs, run, step_result)
  # `batch` is available from Sidekiq::Pro
  raise SidekiqBatchRequired unless defined?(Sidekiq::Batch)

  batch.jobs do
    step_batch = Sidekiq::Batch.new
    # step_batch.description = "AcidicJob::Workflow Step: #{step}"
    step_batch.on(
      :success,
      "#{self.class.name}#step_done",
      # NOTE: options are marshalled through JSON so use only basic types.
      { "run_id" => run.id,
        "step_result_yaml" => step_result.to_yaml.strip }
    )
    # NOTE: The jobs method is atomic.
    # All jobs created in the block are actually pushed atomically at the end of the block.
    # If an error is raised, none of the jobs will go to Redis.
    step_batch.jobs do
      jobs.each do |worker_name|
        # TODO: handle Symbols as well
        worker = worker_name.is_a?(String) ? worker_name.constantize : worker_name
        if worker.instance_method(:perform).arity.zero?
          worker.perform_async
        elsif worker.instance_method(:perform).arity == 1
          worker.perform_async(run.id)
        else
          raise TooManyParametersForParallelJob
        end
      end
    end
  end
end

#step_done(_status, options) ⇒ Object

[View source]

55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/acidic_job/awaiting.rb', line 55

def step_done(_status, options)
  run = Run.find(options["run_id"])
  current_step = run.workflow[run.recovery_point.to_s]
  # re-hydrate the `step_result` object
  step_result = YAML.safe_load(options["step_result_yaml"], permitted_classes: [RecoveryPoint, FinishedPoint])
  step = Step.new(current_step, run, self, step_result)

  # TODO: WRITE REGRESSION TESTS FOR PARALLEL JOB FAILING AND RETRYING THE ORIGINAL STEP
  step.progress
  # when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again
  process_run(run)
end