Class: RubyReactor::Executor
- Inherits:
-
Object
- Object
- RubyReactor::Executor
- Defined in:
- lib/ruby_reactor/executor.rb,
lib/ruby_reactor/executor/graph_manager.rb,
lib/ruby_reactor/executor/retry_manager.rb,
lib/ruby_reactor/executor/step_executor.rb,
lib/ruby_reactor/executor/result_handler.rb,
lib/ruby_reactor/executor/input_validator.rb,
lib/ruby_reactor/executor/compensation_manager.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Classes: CompensationManager, GraphManager, InputValidator, ResultHandler, RetryManager, StepExecutor
Instance Attribute Summary collapse
-
#compensation_manager ⇒ Object
readonly
Returns the value of attribute compensation_manager.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#dependency_graph ⇒ Object
readonly
Returns the value of attribute dependency_graph.
-
#middlewares ⇒ Object
readonly
Returns the value of attribute middlewares.
-
#reactor_class ⇒ Object
readonly
Returns the value of attribute reactor_class.
-
#result ⇒ Object
readonly
Returns the value of attribute result.
-
#result_handler ⇒ Object
readonly
Returns the value of attribute result_handler.
-
#retry_manager ⇒ Object
readonly
Returns the value of attribute retry_manager.
-
#step_executor ⇒ Object
readonly
Returns the value of attribute step_executor.
Class Method Summary collapse
Instance Method Summary collapse
-
#execute ⇒ Object
rubocop:disable Metrics/MethodLength.
- #execution_trace ⇒ Object
-
#initialize(reactor_class, inputs = {}, context = nil) ⇒ Executor
constructor
A new instance of Executor.
- #persist_context? ⇒ Boolean
-
#resume_execution ⇒ Object
rubocop:disable Metrics/MethodLength.
- #save_context ⇒ Object
- #undo_all ⇒ Object
- #undo_stack ⇒ Object
- #undo_trace ⇒ Object
Constructor Details
#initialize(reactor_class, inputs = {}, context = nil) ⇒ Executor
Returns a new instance of Executor.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/ruby_reactor/executor.rb', line 17 def initialize(reactor_class, inputs = {}, context = nil) @reactor_class = reactor_class @context = context || Context.new(inputs, reactor_class) @middlewares = Executor.middlewares_for(reactor_class) @context.middlewares = @middlewares @dependency_graph = DependencyGraph.new @compensation_manager = CompensationManager.new(@context) @retry_manager = RetryManager.new(@context, @middlewares) @result_handler = ResultHandler.new( context: @context, compensation_manager: @compensation_manager, dependency_graph: @dependency_graph ) @step_executor = StepExecutor.new( context: @context, dependency_graph: @dependency_graph, reactor_class: @reactor_class, managers: { retry_manager: @retry_manager, result_handler: @result_handler, compensation_manager: @compensation_manager, middlewares: @middlewares } ) @result = nil @acquired_lock = nil @acquired_semaphore = nil end |
Instance Attribute Details
#compensation_manager ⇒ Object (readonly)
Returns the value of attribute compensation_manager.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def compensation_manager @compensation_manager end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def context @context end |
#dependency_graph ⇒ Object (readonly)
Returns the value of attribute dependency_graph.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def dependency_graph @dependency_graph end |
#middlewares ⇒ Object (readonly)
Returns the value of attribute middlewares.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def middlewares @middlewares end |
#reactor_class ⇒ Object (readonly)
Returns the value of attribute reactor_class.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def reactor_class @reactor_class end |
#result ⇒ Object (readonly)
Returns the value of attribute result.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def result @result end |
#result_handler ⇒ Object (readonly)
Returns the value of attribute result_handler.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def result_handler @result_handler end |
#retry_manager ⇒ Object (readonly)
Returns the value of attribute retry_manager.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def retry_manager @retry_manager end |
#step_executor ⇒ Object (readonly)
Returns the value of attribute step_executor.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def step_executor @step_executor end |
Class Method Details
.middlewares_for(reactor_class) ⇒ Object
66 67 68 |
# File 'lib/ruby_reactor/executor.rb', line 66 def self.middlewares_for(reactor_class) RubyReactor::MiddlewareRunner.new(resolve_middlewares(reactor_class)) end |
.resolve_middlewares(reactor_class) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/ruby_reactor/executor.rb', line 46 def self.resolve_middlewares(reactor_class) global_list = Array(RubyReactor.configuration.middlewares) reactor_list = if reactor_class.respond_to?(:middlewares) Array(reactor_class.middlewares) else [] end (global_list + reactor_list).map do |mw| if mw.is_a?(Class) mw.new elsif mw.is_a?(Array) && mw.first.is_a?(Class) klass, opts = mw klass.new(**(opts || {})) else mw end end end |
Instance Method Details
#execute ⇒ Object
rubocop:disable Metrics/MethodLength
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/ruby_reactor/executor.rb', line 70 def execute # rubocop:disable Metrics/MethodLength middlewares.on(:start_reactor, reactor_class.name, context.inputs, @context) completed = false if (skipped = check_period_gate) completed = true return finalize_skipped(skipped) end # Validate inputs BEFORE consuming a rate-limit slot or grabbing a # lock/semaphore: a run that can never start must not burn quota or # briefly block other callers. input_validator = InputValidator.new(@reactor_class, @context) input_validator.validate! acquire_locks_with_telemetry # Re-check the period gate now that we hold the lock. The pre-lock check # is a fast path; this one closes the race where two callers both passed # it and then serialized on the lock — without it the second caller would # re-run work the first already marked. (No-op when no lock is configured.) if (skipped = check_period_gate) completed = true return finalize_skipped(skipped) end @context.status = :running save_context graph_manager = GraphManager.new(@reactor_class, @dependency_graph, @context) graph_manager.build_and_validate! graph_manager.mark_completed_steps_from_context @result = @step_executor.execute_all_steps update_context_status(@result) mark_period_on_success(@result) handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult) completed = true @result rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError, RubyReactor::RateLimitRegistry::UnknownLimitError => e raise e rescue StandardError => e @result = @result_handler.handle_execution_error(e) update_context_status(@result) completed = true @result ensure release_locks save_context if persist_context? if completed middlewares.on(:complete_reactor, reactor_class.name, @result, @context) else middlewares.on(:failed_reactor, reactor_class.name, $ERROR_INFO, @context) end end |
#execution_trace ⇒ Object
206 207 208 |
# File 'lib/ruby_reactor/executor.rb', line 206 def execution_trace @context.execution_trace end |
#persist_context? ⇒ Boolean
219 220 221 222 223 |
# File 'lib/ruby_reactor/executor.rb', line 219 def persist_context? @context.status.to_s != "pending" || @context.execution_trace.any? || @context.intermediate_results.any? end |
#resume_execution ⇒ Object
rubocop:disable Metrics/MethodLength
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/ruby_reactor/executor.rb', line 130 def resume_execution # rubocop:disable Metrics/MethodLength middlewares.on(:start_reactor, reactor_class.name, context.inputs, @context) completed = false # A fresh async reactor run reaches the worker through resume_execution # (it never calls execute), so the period and rate-limit gates that live # in execute must be applied here too. Genuine resumes (a step already ran # or we paused mid-flight, so current_step is set) must NOT re-gate: a # paused reactor must not throttle or skip itself on the way back in. first_run = first_execution? begin @context.status = :running if first_run && (skipped = check_period_gate) completed = true return finalize_skipped(skipped) end check_rate_limit if first_run acquire_concurrency_primitives # Post-lock re-check (see execute) — closes the period race for the # first run of a locked async reactor. if first_run && (skipped = check_period_gate) completed = true return finalize_skipped(skipped) end prepare_for_resume save_context @result = if @context.current_step execute_current_step_and_continue else execute_remaining_steps end update_context_status(@result) mark_period_on_success(@result) handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult) completed = true @result rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError, RubyReactor::RateLimitRegistry::UnknownLimitError raise rescue StandardError => e handle_resume_error(e) update_context_status(@result) completed = true @result ensure release_locks save_context if completed middlewares.on(:complete_reactor, reactor_class.name, @result, @context) else middlewares.on(:failed_reactor, reactor_class.name, $ERROR_INFO, @context) end end end |
#save_context ⇒ Object
210 211 212 213 214 215 216 217 |
# File 'lib/ruby_reactor/executor.rb', line 210 def save_context storage = RubyReactor::Configuration.instance.storage_adapter reactor_class_name = @reactor_class.name || "AnonymousReactor-#{@reactor_class.object_id}" # Serialize context serialized_context = ContextSerializer.serialize(@context) storage.store_context(@context.context_id, serialized_context, reactor_class_name) end |
#undo_all ⇒ Object
194 195 196 |
# File 'lib/ruby_reactor/executor.rb', line 194 def undo_all @compensation_manager.rollback_completed_steps end |
#undo_stack ⇒ Object
198 199 200 |
# File 'lib/ruby_reactor/executor.rb', line 198 def undo_stack @compensation_manager.undo_stack end |
#undo_trace ⇒ Object
202 203 204 |
# File 'lib/ruby_reactor/executor.rb', line 202 def undo_trace @compensation_manager.undo_trace end |