Class: RubyReactor::Executor::StepExecutor
- Inherits:
-
Object
- Object
- RubyReactor::Executor::StepExecutor
- Defined in:
- lib/ruby_reactor/executor/step_executor.rb
Instance Method Summary collapse
- #execute_all_steps ⇒ Object
- #execute_step(step_config) ⇒ Object
-
#initialize(context:, dependency_graph:, reactor_class:, managers:) ⇒ StepExecutor
constructor
A new instance of StepExecutor.
Constructor Details
#initialize(context:, dependency_graph:, reactor_class:, managers:) ⇒ StepExecutor
Returns a new instance of StepExecutor.
6 7 8 9 10 11 12 13 14 15 |
# File 'lib/ruby_reactor/executor/step_executor.rb', line 6 def initialize(context:, dependency_graph:, reactor_class:, managers:) @context = context @dependency_graph = dependency_graph @reactor_class = reactor_class @retry_manager = managers[:retry_manager] @result_handler = managers[:result_handler] @compensation_manager = managers[:compensation_manager] @middlewares = managers[:middlewares] || context.middlewares || Executor.middlewares_for(reactor_class) @on_step_complete = managers[:on_step_complete] end |
Instance Method Details
#execute_all_steps ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/ruby_reactor/executor/step_executor.rb', line 17 def execute_all_steps until @dependency_graph.all_completed? || @context.finished? ready_steps = @dependency_graph.ready_steps if ready_steps.empty? raise Error::DependencyError.new( "No ready steps available but execution not complete", context: @context ) end # Execute steps sequentially ready_steps.each do |step_config| result = execute_step(step_config) # If step execution was handed off to async, return the async result return result if result.is_a?(RubyReactor::AsyncResult) # If a step returns RetryQueuedResult, we need to stop and return it return result if result.is_a?(RetryQueuedResult) # If a step returns Skipped, halt the reactor cleanly (no # compensation). Must be checked BEFORE Failure / Success because # Skipped is a Success subclass. return result if result.is_a?(RubyReactor::Skipped) # If a step returns Failure, we need to stop execution and return it return result if result.is_a?(RubyReactor::Failure) # If a step returns InterruptResult, we need to stop execution and return it return result if result.is_a?(RubyReactor::InterruptResult) # Only a continue-Success reaches here (Async/Retry/Skipped/Failure/ # Interrupt all returned above; nil is inline-async test mode). It is # the one outcome where the loop proceeds to more steps with no other # save in between — every terminal/handoff result persists via its own # path. Write a durable checkpoint so a crash re-runs at most this one # step. Ordering: side-effect -> record result (inside execute_step) -> # checkpoint here. @on_step_complete&.call if result.is_a?(RubyReactor::Success) end end # Return the final result @result_handler.final_result(@reactor_class) end |
#execute_step(step_config) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/ruby_reactor/executor/step_executor.rb', line 64 def execute_step(step_config) # If we're already in inline async execution mode (inside Worker), # treat async steps as sync to avoid infinite recursion if @dependency_graph.completed.include?(step_config.name) return RubyReactor.Success(@context.get_result(step_config.name)) end resolved_arguments = resolve_arguments(step_config) @middlewares.on(:start_step, step_config.name, resolved_arguments, @context) completed = false begin result = if step_config.interrupt? handle_interrupt_step(step_config) elsif step_config.async? && !@context.inline_async_execution handle_async_step(step_config) else execute_step_with_retry(step_config, resolved_arguments) end completed = true if result.is_a?(RubyReactor::Failure) @middlewares.on(:failed_step, step_config.name, result, @context) else @middlewares.on(:complete_step, step_config.name, result, @context) end result rescue Exception => e # rubocop:disable Lint/RescueException @middlewares.on(:failed_step, step_config.name, e, @context) unless completed raise end end |