Class: Easykiq::OrchestrationExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/easykiq/orchestration_executor.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.execute(workflow:, parent_batch:) ⇒ Object



5
6
7
# File 'lib/easykiq/orchestration_executor.rb', line 5

def self.execute(workflow:, parent_batch:)
  new.run_step(parent_batch, workflow, 0)
end

Instance Method Details

#nest_under(parent_batch) ⇒ Object



27
28
29
30
31
32
33
34
35
# File 'lib/easykiq/orchestration_executor.rb', line 27

def nest_under(parent_batch)
  if parent_batch
    parent_batch.jobs do
      yield
    end
  else
    yield
  end
end

#on_success(status, options) ⇒ Object



37
38
39
40
41
42
# File 'lib/easykiq/orchestration_executor.rb', line 37

def on_success(status, options)
  return if options["step"] == options["orchestration_workflow"].length

  parent_batch = Sidekiq::Batch.new(status.parent_bid)
  run_step(parent_batch, options["orchestration_workflow"], options["step"])
end

#run_step(parent_batch, workflow, step) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/easykiq/orchestration_executor.rb', line 9

def run_step(parent_batch, workflow, step)
  nest_under(parent_batch) do
    *jobs = workflow.at(step)
    sidekiq_batch = Sidekiq::Batch.new
    sidekiq_batch.on(
      :success,
      self.class,
      "orchestration_workflow" => workflow, "step" => step + 1
    )

    sidekiq_batch.jobs do
      jobs.each do |job|
        Object.const_get(job["klass"]).perform_async(*job["args"])
      end
    end
  end
end