Class: AcidicJob::Processor
- Inherits:
-
Object
- Object
- AcidicJob::Processor
- Defined in:
- lib/acidic_job/processor.rb
Instance Method Summary collapse
-
#initialize(run, job) ⇒ Processor
constructor
A new instance of Processor.
- #process_run ⇒ Object
Constructor Details
#initialize(run, job) ⇒ Processor
Returns a new instance of Processor.
5 6 7 8 9 |
# File 'lib/acidic_job/processor.rb', line 5 def initialize(run, job) @run = run @job = job @workflow = Workflow.new(run, job) end |
Instance Method Details
#process_run ⇒ Object
[View source]
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/acidic_job/processor.rb', line 11 def process_run # if the run record is already marked as finished, immediately return its result return @run.succeeded? if @run.finished? AcidicJob.logger.log_run_event("Processing #{@run.current_step_name}...", @job, @run) loop do break if @run.finished? if !@run.known_recovery_point? raise UnknownRecoveryPoint, "Defined workflow does not reference this step: #{@run.current_step_name.inspect}" elsif (awaited_jobs = jobs_from(@run.current_step_awaits)).any? # We only execute the current step, without progressing to the next step. # This ensures that any failures in parallel jobs will have this step retried in the main workflow step_result = @workflow.execute_current_step # We allow the `#step_done` method to manage progressing the recovery_point to the next step, # and then calling `process_run` to restart the main workflow on the next step. # We pass the `step_result` so that the async callback called after the step-parallel-jobs complete # can move on to the appropriate next stage in the workflow. enqueue_awaited_jobs(awaited_jobs, step_result) # after processing the current step, break the processing loop # and stop this method from blocking in the primary worker # as it will continue once the background workers all succeed # so we want to keep the primary worker queue free to process new work # this CANNOT ever be `break` as that wouldn't exit the parent job, # only this step in the workflow, blocking as it awaits the next step return true else @workflow.execute_current_step @workflow.progress_to_next_step end end AcidicJob.logger.log_run_event("Processed #{@run.current_step_name}.", @job, @run) @run.succeeded? end |