Class: AcidicJob::Processor
- Inherits:
-
Object
- Object
- AcidicJob::Processor
- Defined in:
- lib/acidic_job/processor.rb
Instance Method Summary collapse
-
#initialize(run, job) ⇒ Processor
constructor
A new instance of Processor.
- #process_run ⇒ Object
Constructor Details
Instance Method Details
#process_run ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/acidic_job/processor.rb', line 11 def process_run # if the run record is already marked as finished, immediately return its result return @run.succeeded? if @run.finished? AcidicJob.logger.log_run_event("Processing #{@run.current_step_name}...", @job, @run) loop do break if @run.finished? if !@run.known_recovery_point? raise UnknownRecoveryPoint, "Defined workflow does not reference this step: #{@run.current_step_name.inspect}" elsif (awaited_jobs = jobs_from(@run.current_step_awaits)).any? # We only execute the current step, without progressing to the next step. # This ensures that any failures in parallel jobs will have this step retried in the main workflow step_result = @workflow.execute_current_step # We allow the `#step_done` method to manage progressing the recovery_point to the next step, # and then calling `process_run` to restart the main workflow on the next step. # We pass the `step_result` so that the async callback called after the step-parallel-jobs complete # can move on to the appropriate next stage in the workflow. enqueue_awaited_jobs(awaited_jobs, step_result) # after processing the current step, break the processing loop # and stop this method from blocking in the primary worker # as it will continue once the background workers all succeed # so we want to keep the primary worker queue free to process new work # this CANNOT ever be `break` as that wouldn't exit the parent job, # only this step in the workflow, blocking as it awaits the next step return true else @workflow.execute_current_step @workflow.progress_to_next_step end end AcidicJob.logger.log_run_event("Processed #{@run.current_step_name}.", @job, @run) @run.succeeded? end |