Class: AcidicJob::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/acidic_job/processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(run, job) ⇒ Processor

Returns a new instance of Processor.

[View source]

5
6
7
8
9
# File 'lib/acidic_job/processor.rb', line 5

def initialize(run, job)
  @run = run
  @job = job
  @workflow = Workflow.new(run, job)
end

Instance Method Details

#process_runObject

[View source]

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/acidic_job/processor.rb', line 11

def process_run
  # if the run record is already marked as finished, immediately return its result
  return @run.succeeded? if @run.finished?

  AcidicJob.logger.log_run_event("Processing #{@run.current_step_name}...", @job, @run)
  loop do
    break if @run.finished?

    if !@run.known_recovery_point?
      raise UnknownRecoveryPoint,
            "Defined workflow does not reference this step: #{@run.current_step_name.inspect}"
    elsif (awaited_jobs = jobs_from(@run.current_step_awaits)).any?
      # We only execute the current step, without progressing to the next step.
      # This ensures that any failures in parallel jobs will have this step retried in the main workflow
      step_result = @workflow.execute_current_step
      # We allow the `#step_done` method to manage progressing the recovery_point to the next step,
      # and then calling `process_run` to restart the main workflow on the next step.
      # We pass the `step_result` so that the async callback called after the step-parallel-jobs complete
      # can move on to the appropriate next stage in the workflow.
      enqueue_awaited_jobs(awaited_jobs, step_result)
      # after processing the current step, break the processing loop
      # and stop this method from blocking in the primary worker
      # as it will continue once the background workers all succeed
      # so we want to keep the primary worker queue free to process new work
      # this CANNOT ever be `break` as that wouldn't exit the parent job,
      # only this step in the workflow, blocking as it awaits the next step
      return true
    else
      @workflow.execute_current_step
      @workflow.progress_to_next_step
    end
  end
  AcidicJob.logger.log_run_event("Processed #{@run.current_step_name}.", @job, @run)

  @run.succeeded?
end