Class: RubyReactor::Executor::StepExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_reactor/executor/step_executor.rb

Instance Method Summary collapse

Constructor Details

#initialize(context:, dependency_graph:, reactor_class:, managers:) ⇒ StepExecutor

Returns a new instance of StepExecutor.



6
7
8
9
10
11
12
13
# File 'lib/ruby_reactor/executor/step_executor.rb', line 6

def initialize(context:, dependency_graph:, reactor_class:, managers:)
  @context = context
  @dependency_graph = dependency_graph
  @reactor_class = reactor_class
  @retry_manager = managers[:retry_manager]
  @result_handler = managers[:result_handler]
  @compensation_manager = managers[:compensation_manager]
end

Instance Method Details

#execute_all_stepsObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/ruby_reactor/executor/step_executor.rb', line 15

def execute_all_steps
  until @dependency_graph.all_completed? || @context.finished?
    ready_steps = @dependency_graph.ready_steps

    if ready_steps.empty?
      raise Error::DependencyError.new(
        "No ready steps available but execution not complete",
        context: @context
      )
    end

    # Execute steps sequentially
    ready_steps.each do |step_config|
      result = execute_step(step_config)

      # If step execution was handed off to async, return the async result
      return result if result.is_a?(RubyReactor::AsyncResult)

      # If a step returns RetryQueuedResult, we need to stop and return it
      return result if result.is_a?(RetryQueuedResult)

      # If a step returns Skipped, halt the reactor cleanly (no
      # compensation). Must be checked BEFORE Failure / Success because
      # Skipped is a Success subclass.
      return result if result.is_a?(RubyReactor::Skipped)

      # If a step returns Failure, we need to stop execution and return it
      return result if result.is_a?(RubyReactor::Failure)

      # If a step returns InterruptResult, we need to stop execution and return it
      return result if result.is_a?(RubyReactor::InterruptResult)

      # If result is nil, it means async was executed inline (test mode), continue
      next if result.nil?
    end
  end

  # Return the final result
  @result_handler.final_result(@reactor_class)
end

#execute_step(step_config) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/ruby_reactor/executor/step_executor.rb', line 56

def execute_step(step_config)
  # If we're already in inline async execution mode (inside Worker),
  # treat async steps as sync to avoid infinite recursion

  if @dependency_graph.completed.include?(step_config.name)
    return RubyReactor.Success(@context.get_result(step_config.name))
  end

  if step_config.interrupt?
    handle_interrupt_step(step_config)
  elsif step_config.async? && !@context.inline_async_execution
    handle_async_step(step_config)
  else
    execute_step_with_retry(step_config)
  end
end