Class: RubyReactor::Executor::StepExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_reactor/executor/step_executor.rb

Instance Method Summary collapse

Constructor Details

#initialize(context:, dependency_graph:, reactor_class:, managers:) ⇒ StepExecutor

Returns a new instance of StepExecutor.



6
7
8
9
10
11
12
13
14
# File 'lib/ruby_reactor/executor/step_executor.rb', line 6

def initialize(context:, dependency_graph:, reactor_class:, managers:)
  @context = context
  @dependency_graph = dependency_graph
  @reactor_class = reactor_class
  @retry_manager = managers[:retry_manager]
  @result_handler = managers[:result_handler]
  @compensation_manager = managers[:compensation_manager]
  @middlewares = managers[:middlewares] || context.middlewares || Executor.middlewares_for(reactor_class)
end

Instance Method Details

#execute_all_stepsObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/ruby_reactor/executor/step_executor.rb', line 16

def execute_all_steps
  until @dependency_graph.all_completed? || @context.finished?
    ready_steps = @dependency_graph.ready_steps

    if ready_steps.empty?
      raise Error::DependencyError.new(
        "No ready steps available but execution not complete",
        context: @context
      )
    end

    # Execute steps sequentially
    ready_steps.each do |step_config|
      result = execute_step(step_config)

      # If step execution was handed off to async, return the async result
      return result if result.is_a?(RubyReactor::AsyncResult)

      # If a step returns RetryQueuedResult, we need to stop and return it
      return result if result.is_a?(RetryQueuedResult)

      # If a step returns Skipped, halt the reactor cleanly (no
      # compensation). Must be checked BEFORE Failure / Success because
      # Skipped is a Success subclass.
      return result if result.is_a?(RubyReactor::Skipped)

      # If a step returns Failure, we need to stop execution and return it
      return result if result.is_a?(RubyReactor::Failure)

      # If a step returns InterruptResult, we need to stop execution and return it
      return result if result.is_a?(RubyReactor::InterruptResult)

      # If result is nil, it means async was executed inline (test mode), continue
      next if result.nil?
    end
  end

  # Return the final result
  @result_handler.final_result(@reactor_class)
end

#execute_step(step_config) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/ruby_reactor/executor/step_executor.rb', line 57

def execute_step(step_config)
  # If we're already in inline async execution mode (inside Worker),
  # treat async steps as sync to avoid infinite recursion

  if @dependency_graph.completed.include?(step_config.name)
    return RubyReactor.Success(@context.get_result(step_config.name))
  end

  resolved_arguments = resolve_arguments(step_config)

  @middlewares.on(:start_step, step_config.name, resolved_arguments, @context)
  completed = false
  begin
    result = if step_config.interrupt?
               handle_interrupt_step(step_config)
             elsif step_config.async? && !@context.inline_async_execution
               handle_async_step(step_config)
             else
               execute_step_with_retry(step_config, resolved_arguments)
             end
    completed = true
    if result.is_a?(RubyReactor::Failure)
      @middlewares.on(:failed_step, step_config.name, result, @context)
    else
      @middlewares.on(:complete_step, step_config.name, result, @context)
    end
    result
  rescue Exception => e # rubocop:disable Lint/RescueException
    @middlewares.on(:failed_step, step_config.name, e, @context) unless completed
    raise
  end
end