Class: AcidicJob::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/acidic_job/processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(run, job) ⇒ Processor

Returns a new instance of Processor.



5
6
7
8
9
# File 'lib/acidic_job/processor.rb', line 5

def initialize(run, job)
  @run = run
  @job = job
  @workflow = Workflow.new(run, job)
end

Instance Method Details

#process_runObject



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/acidic_job/processor.rb', line 11

def process_run
  # if the run record is already marked as finished, immediately return its result
  return @run.succeeded? if @run.finished?

  AcidicJob.logger.log_run_event("Processing #{@run.current_step_name}...", @job, @run)
  loop do
    break if @run.finished?

    if !@run.known_recovery_point?
      raise UnknownRecoveryPoint,
            "Defined workflow does not reference this step: #{@run.current_step_name.inspect}"
    elsif (awaited_jobs = jobs_from(@run.current_step_awaits)).any?
      # We only execute the current step, without progressing to the next step.
      # This ensures that any failures in parallel jobs will have this step retried in the main workflow
      step_result = @workflow.execute_current_step
      # We allow the `#step_done` method to manage progressing the recovery_point to the next step,
      # and then calling `process_run` to restart the main workflow on the next step.
      # We pass the `step_result` so that the async callback called after the step-parallel-jobs complete
      # can move on to the appropriate next stage in the workflow.
      enqueue_awaited_jobs(awaited_jobs, step_result)
      # after processing the current step, break the processing loop
      # and stop this method from blocking in the primary worker
      # as it will continue once the background workers all succeed
      # so we want to keep the primary worker queue free to process new work
      # this CANNOT ever be `break` as that wouldn't exit the parent job,
      # only this step in the workflow, blocking as it awaits the next step
      return true
    else
      @workflow.execute_current_step
      @workflow.progress_to_next_step
    end
  end
  AcidicJob.logger.log_run_event("Processed #{@run.current_step_name}.", @job, @run)

  @run.succeeded?
end