Class: RubyReactor::Executor
- Inherits:
-
Object
- Object
- RubyReactor::Executor
- Includes:
- OrderedLockSupport
- Defined in:
- lib/ruby_reactor/executor.rb,
lib/ruby_reactor/executor/graph_manager.rb,
lib/ruby_reactor/executor/retry_manager.rb,
lib/ruby_reactor/executor/step_executor.rb,
lib/ruby_reactor/executor/result_handler.rb,
lib/ruby_reactor/executor/input_validator.rb,
lib/ruby_reactor/executor/compensation_manager.rb,
lib/ruby_reactor/executor/ordered_lock_support.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Modules: OrderedLockSupport Classes: CompensationManager, GraphManager, InputValidator, ResultHandler, RetryManager, StepExecutor
Constant Summary
Constants included from OrderedLockSupport
OrderedLockSupport::HEARTBEAT_MIN_INTERVAL, OrderedLockSupport::THREAD_LOCAL_ACTIVE_KEYS
Instance Attribute Summary collapse
-
#compensation_manager ⇒ Object
readonly
Returns the value of attribute compensation_manager.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#dependency_graph ⇒ Object
readonly
Returns the value of attribute dependency_graph.
-
#middlewares ⇒ Object
readonly
Returns the value of attribute middlewares.
-
#reactor_class ⇒ Object
readonly
Returns the value of attribute reactor_class.
-
#result ⇒ Object
readonly
Returns the value of attribute result.
-
#result_handler ⇒ Object
readonly
Returns the value of attribute result_handler.
-
#retry_manager ⇒ Object
readonly
Returns the value of attribute retry_manager.
-
#step_executor ⇒ Object
readonly
Returns the value of attribute step_executor.
Class Method Summary collapse
Instance Method Summary collapse
-
#emit_lifecycle_completion(completed) ⇒ Object
Contention errors (lock/semaphore/rate-limit/ordered-lock wait) are expected “try again later” signals, not failures — the worker snoozes and re-runs.
-
#execute ⇒ Object
rubocop:disable Metrics/MethodLength.
- #execution_trace ⇒ Object
-
#initialize(reactor_class, inputs = {}, context = nil) ⇒ Executor
constructor
A new instance of Executor.
- #persist_context? ⇒ Boolean
-
#resume_execution ⇒ Object
rubocop:disable Metrics/MethodLength,Metrics/PerceivedComplexity.
- #save_context ⇒ Object
- #undo_all ⇒ Object
- #undo_stack ⇒ Object
- #undo_trace ⇒ Object
Methods included from OrderedLockSupport
active_keys, #advance_ordered_lock_if_terminal, advance_with_retry, #check_ordered_lock_gate, #enter_ordered_lock_scope, #fresh_ordered_lock_start?, info_from, #leave_ordered_lock_scope, #ordered_lock_chain_skip?, #ordered_lock_drained_replay?, #ordered_lock_short_circuit, #ordered_lock_stale_batch?, #redelivery_of_terminal?, #short_circuit!, #short_circuit_result, #skip_context_persist?, #start_ordered_lock_heartbeat, #stop_ordered_lock_heartbeat, #stored_context_status, #stored_status_terminal?
Constructor Details
#initialize(reactor_class, inputs = {}, context = nil) ⇒ Executor
Returns a new instance of Executor.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/ruby_reactor/executor.rb', line 20 def initialize(reactor_class, inputs = {}, context = nil) @reactor_class = reactor_class @context = context || Context.new(inputs, reactor_class) @middlewares = Executor.middlewares_for(reactor_class) @context.middlewares = @middlewares @dependency_graph = DependencyGraph.new @compensation_manager = CompensationManager.new(@context) @retry_manager = RetryManager.new(@context, @middlewares) @result_handler = ResultHandler.new( context: @context, compensation_manager: @compensation_manager, dependency_graph: @dependency_graph ) @step_executor = StepExecutor.new( context: @context, dependency_graph: @dependency_graph, reactor_class: @reactor_class, managers: { retry_manager: @retry_manager, result_handler: @result_handler, compensation_manager: @compensation_manager, middlewares: @middlewares } ) @result = nil @acquired_lock = nil @acquired_semaphore = nil @contention_snooze = false @skip_context_persist = false end |
Instance Attribute Details
#compensation_manager ⇒ Object (readonly)
Returns the value of attribute compensation_manager.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def compensation_manager @compensation_manager end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def context @context end |
#dependency_graph ⇒ Object (readonly)
Returns the value of attribute dependency_graph.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def dependency_graph @dependency_graph end |
#middlewares ⇒ Object (readonly)
Returns the value of attribute middlewares.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def middlewares @middlewares end |
#reactor_class ⇒ Object (readonly)
Returns the value of attribute reactor_class.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def reactor_class @reactor_class end |
#result ⇒ Object (readonly)
Returns the value of attribute result.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def result @result end |
#result_handler ⇒ Object (readonly)
Returns the value of attribute result_handler.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def result_handler @result_handler end |
#retry_manager ⇒ Object (readonly)
Returns the value of attribute retry_manager.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def retry_manager @retry_manager end |
#step_executor ⇒ Object (readonly)
Returns the value of attribute step_executor.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def step_executor @step_executor end |
Class Method Details
.middlewares_for(reactor_class) ⇒ Object
71 72 73 |
# File 'lib/ruby_reactor/executor.rb', line 71 def self.middlewares_for(reactor_class) RubyReactor::MiddlewareRunner.new(resolve_middlewares(reactor_class)) end |
.resolve_middlewares(reactor_class) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/ruby_reactor/executor.rb', line 51 def self.resolve_middlewares(reactor_class) global_list = Array(RubyReactor.configuration.middlewares) reactor_list = if reactor_class.respond_to?(:middlewares) Array(reactor_class.middlewares) else [] end (global_list + reactor_list).map do |mw| if mw.is_a?(Class) mw.new elsif mw.is_a?(Array) && mw.first.is_a?(Class) klass, opts = mw klass.new(**(opts || {})) else mw end end end |
Instance Method Details
#emit_lifecycle_completion(completed) ⇒ Object
Contention errors (lock/semaphore/rate-limit/ordered-lock wait) are expected “try again later” signals, not failures — the worker snoozes and re-runs. Emitting ‘failed_reactor` for them floods dashboards with phantom failures (one per snooze round), so route them to a distinct `snooze_reactor` event instead.
143 144 145 146 147 148 149 150 151 |
# File 'lib/ruby_reactor/executor.rb', line 143 def emit_lifecycle_completion(completed) if completed middlewares.on(:complete_reactor, reactor_class.name, @result, @context) elsif @contention_snooze middlewares.on(:snooze_reactor, reactor_class.name, $ERROR_INFO, @context) else middlewares.on(:failed_reactor, reactor_class.name, $ERROR_INFO, @context) end end |
#execute ⇒ Object
rubocop:disable Metrics/MethodLength
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/ruby_reactor/executor.rb', line 75 def execute # rubocop:disable Metrics/MethodLength middlewares.on(:start_reactor, reactor_class.name, context.inputs, @context) completed = false enter_ordered_lock_scope # short_circuit_result covers both the strict ordered-lock chain skip # and the already-marked period bucket. short = short_circuit_result if short completed = true return short_circuit!(short) end # Validate inputs BEFORE consuming a rate-limit slot or grabbing a # lock/semaphore: a run that can never start must not burn quota or # briefly block other callers. input_validator = InputValidator.new(@reactor_class, @context) input_validator.validate! acquire_locks_with_telemetry # Re-check the period gate now that we hold the lock. The pre-lock check # is a fast path; this one closes the race where two callers both passed # it and then serialized on the lock — without it the second caller would # re-run work the first already marked. (No-op when no lock is configured.) if (skipped = check_period_gate) completed = true return finalize_skipped(skipped) end @context.status = :running save_context graph_manager = GraphManager.new(@reactor_class, @dependency_graph, @context) graph_manager.build_and_validate! graph_manager.mark_completed_steps_from_context @result = @step_executor.execute_all_steps update_context_status(@result) mark_period_on_success(@result) handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult) completed = true @result rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError, RubyReactor::RateLimitRegistry::UnknownLimitError, RubyReactor::OrderedLock::WaitError => e @contention_snooze = true raise e rescue StandardError => e @result = @result_handler.handle_execution_error(e) update_context_status(@result) completed = true @result ensure release_locks leave_ordered_lock_scope save_context if persist_context? && !skip_context_persist? emit_lifecycle_completion(completed) end |
#execution_trace ⇒ Object
238 239 240 |
# File 'lib/ruby_reactor/executor.rb', line 238 def execution_trace @context.execution_trace end |
#persist_context? ⇒ Boolean
251 252 253 254 255 |
# File 'lib/ruby_reactor/executor.rb', line 251 def persist_context? @context.status.to_s != "pending" || @context.execution_trace.any? || @context.intermediate_results.any? end |
#resume_execution ⇒ Object
rubocop:disable Metrics/MethodLength,Metrics/PerceivedComplexity
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/ruby_reactor/executor.rb', line 153 def resume_execution # rubocop:disable Metrics/MethodLength,Metrics/PerceivedComplexity middlewares.on(:start_reactor, reactor_class.name, context.inputs, @context) completed = false # A fresh async reactor run reaches the worker through resume_execution # (it never calls execute), so the period and rate-limit gates that live # in execute must be applied here too. Genuine resumes (a step already ran # or we paused mid-flight, so current_step is set) must NOT re-gate: a # paused reactor must not throttle or skip itself on the way back in. first_run = first_execution? enter_ordered_lock_scope # ordered-lock skip applies on any run; the period gate only on a fresh # first run (a genuine resume must not skip itself when its own marker # eventually lands). short = ordered_lock_short_circuit short ||= check_period_gate if first_run if short completed = true return short_circuit!(short) end @context.status = :running check_rate_limit if first_run # Resumes intentionally skip check_rate_limit (a paused run must not # block itself on resume), so acquire lock/semaphore directly rather # than via acquire_locks. acquire_exclusive_lock if @reactor_class.respond_to?(:lock_config) && @reactor_class.lock_config acquire_semaphore if @reactor_class.respond_to?(:semaphore_config) && @reactor_class.semaphore_config # Post-lock re-check (see execute) — closes the period race for the # first run of a locked async reactor. if first_run && (skipped = check_period_gate) completed = true return finalize_skipped(skipped) end prepare_for_resume save_context @result = if @context.current_step execute_current_step_and_continue else execute_remaining_steps end update_context_status(@result) mark_period_on_success(@result) handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult) completed = true @result rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError, RubyReactor::RateLimitRegistry::UnknownLimitError, RubyReactor::OrderedLock::WaitError => e @contention_snooze = true raise e rescue StandardError => e handle_resume_error(e) update_context_status(@result) completed = true @result ensure release_locks leave_ordered_lock_scope save_context unless skip_context_persist? emit_lifecycle_completion(completed) end |
#save_context ⇒ Object
242 243 244 245 246 247 248 249 |
# File 'lib/ruby_reactor/executor.rb', line 242 def save_context storage = RubyReactor::Configuration.instance.storage_adapter reactor_class_name = @reactor_class.name || "AnonymousReactor-#{@reactor_class.object_id}" # Serialize context serialized_context = ContextSerializer.serialize(@context) storage.store_context(@context.context_id, serialized_context, reactor_class_name) end |
#undo_all ⇒ Object
226 227 228 |
# File 'lib/ruby_reactor/executor.rb', line 226 def undo_all @compensation_manager.rollback_completed_steps end |
#undo_stack ⇒ Object
230 231 232 |
# File 'lib/ruby_reactor/executor.rb', line 230 def undo_stack @compensation_manager.undo_stack end |
#undo_trace ⇒ Object
234 235 236 |
# File 'lib/ruby_reactor/executor.rb', line 234 def undo_trace @compensation_manager.undo_trace end |