Class: RubyReactor::Executor::StepExecutor
- Inherits:
-
Object
- Object
- RubyReactor::Executor::StepExecutor
- Defined in:
- lib/ruby_reactor/executor/step_executor.rb
Instance Method Summary collapse
- #execute_all_steps ⇒ Object
- #execute_step(step_config) ⇒ Object
-
#initialize(context:, dependency_graph:, reactor_class:, managers:) ⇒ StepExecutor
constructor
A new instance of StepExecutor.
Constructor Details
#initialize(context:, dependency_graph:, reactor_class:, managers:) ⇒ StepExecutor
Returns a new instance of StepExecutor.
6 7 8 9 10 11 12 13 |
# File 'lib/ruby_reactor/executor/step_executor.rb', line 6 def initialize(context:, dependency_graph:, reactor_class:, managers:) @context = context @dependency_graph = dependency_graph @reactor_class = reactor_class @retry_manager = managers[:retry_manager] @result_handler = managers[:result_handler] @compensation_manager = managers[:compensation_manager] end |
Instance Method Details
#execute_all_steps ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/ruby_reactor/executor/step_executor.rb', line 15 def execute_all_steps until @dependency_graph.all_completed? || @context.finished? ready_steps = @dependency_graph.ready_steps if ready_steps.empty? raise Error::DependencyError.new( "No ready steps available but execution not complete", context: @context ) end # Execute steps sequentially ready_steps.each do |step_config| result = execute_step(step_config) # If step execution was handed off to async, return the async result return result if result.is_a?(RubyReactor::AsyncResult) # If a step returns RetryQueuedResult, we need to stop and return it return result if result.is_a?(RetryQueuedResult) # If a step returns Skipped, halt the reactor cleanly (no # compensation). Must be checked BEFORE Failure / Success because # Skipped is a Success subclass. return result if result.is_a?(RubyReactor::Skipped) # If a step returns Failure, we need to stop execution and return it return result if result.is_a?(RubyReactor::Failure) # If a step returns InterruptResult, we need to stop execution and return it return result if result.is_a?(RubyReactor::InterruptResult) # If result is nil, it means async was executed inline (test mode), continue next if result.nil? end end # Return the final result @result_handler.final_result(@reactor_class) end |
#execute_step(step_config) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/ruby_reactor/executor/step_executor.rb', line 56 def execute_step(step_config) # If we're already in inline async execution mode (inside Worker), # treat async steps as sync to avoid infinite recursion if @dependency_graph.completed.include?(step_config.name) return RubyReactor.Success(@context.get_result(step_config.name)) end if step_config.interrupt? handle_interrupt_step(step_config) elsif step_config.async? && !@context.inline_async_execution handle_async_step(step_config) else execute_step_with_retry(step_config) end end |