Class: RubyReactor::Executor
- Inherits:
-
Object
- Object
- RubyReactor::Executor
- Includes:
- OrderedLockSupport
- Defined in:
- lib/ruby_reactor/executor.rb,
lib/ruby_reactor/executor/graph_manager.rb,
lib/ruby_reactor/executor/retry_manager.rb,
lib/ruby_reactor/executor/step_executor.rb,
lib/ruby_reactor/executor/result_handler.rb,
lib/ruby_reactor/executor/input_validator.rb,
lib/ruby_reactor/executor/compensation_manager.rb,
lib/ruby_reactor/executor/ordered_lock_support.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Modules: OrderedLockSupport Classes: CompensationManager, GraphManager, InputValidator, ResultHandler, RetryManager, StepExecutor
Constant Summary
Constants included from OrderedLockSupport
OrderedLockSupport::HEARTBEAT_MIN_INTERVAL, OrderedLockSupport::THREAD_LOCAL_ACTIVE_KEYS
Instance Attribute Summary collapse
-
#compensation_manager ⇒ Object
readonly
Returns the value of attribute compensation_manager.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#dependency_graph ⇒ Object
readonly
Returns the value of attribute dependency_graph.
-
#middlewares ⇒ Object
readonly
Returns the value of attribute middlewares.
-
#reactor_class ⇒ Object
readonly
Returns the value of attribute reactor_class.
-
#result ⇒ Object
readonly
Returns the value of attribute result.
-
#result_handler ⇒ Object
readonly
Returns the value of attribute result_handler.
-
#retry_manager ⇒ Object
readonly
Returns the value of attribute retry_manager.
-
#step_executor ⇒ Object
readonly
Returns the value of attribute step_executor.
Class Method Summary collapse
Instance Method Summary collapse
-
#checkpoint!(throttle: false) ⇒ Object
Durable per-step checkpoint.
-
#checkpoint_due? ⇒ Boolean
Whether a throttled (per-step) checkpoint is due.
-
#emit_lifecycle_completion(completed) ⇒ Object
Contention errors (lock/semaphore/rate-limit/ordered-lock wait) are expected “try again later” signals, not failures — the worker snoozes and re-runs.
-
#execute ⇒ Object
rubocop:disable Metrics/MethodLength.
- #execution_trace ⇒ Object
-
#initialize(reactor_class, inputs = {}, context = nil) ⇒ Executor
constructor
A new instance of Executor.
- #persist_context? ⇒ Boolean
-
#resume_execution ⇒ Object
rubocop:disable Metrics/MethodLength,Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity.
- #save_context ⇒ Object
- #undo_all ⇒ Object
- #undo_stack ⇒ Object
- #undo_trace ⇒ Object
Methods included from OrderedLockSupport
active_keys, #advance_ordered_lock_if_terminal, advance_with_retry, #check_ordered_lock_gate, #enter_ordered_lock_scope, #fresh_ordered_lock_start?, info_from, #leave_ordered_lock_scope, #ordered_lock_chain_skip?, #ordered_lock_drained_replay?, #ordered_lock_short_circuit, #ordered_lock_stale_batch?, #redelivery_of_terminal?, #short_circuit!, #short_circuit_result, #skip_context_persist?, #start_ordered_lock_heartbeat, #stop_ordered_lock_heartbeat, #stored_context_status, #stored_status_terminal?
Constructor Details
#initialize(reactor_class, inputs = {}, context = nil) ⇒ Executor
Returns a new instance of Executor.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/ruby_reactor/executor.rb', line 20 def initialize(reactor_class, inputs = {}, context = nil) @reactor_class = reactor_class @context = context || Context.new(inputs, reactor_class) @middlewares = Executor.middlewares_for(reactor_class) @context.middlewares = @middlewares @dependency_graph = DependencyGraph.new @compensation_manager = CompensationManager.new(@context) @retry_manager = RetryManager.new(@context, @middlewares) @result_handler = ResultHandler.new( context: @context, compensation_manager: @compensation_manager, dependency_graph: @dependency_graph ) @step_executor = StepExecutor.new( context: @context, dependency_graph: @dependency_graph, reactor_class: @reactor_class, managers: { retry_manager: @retry_manager, result_handler: @result_handler, compensation_manager: @compensation_manager, middlewares: @middlewares, # Save-per-step durable checkpoint. checkpoint! resolves the ROOT # context, so this same callback — wired into every executor including # the nested ones ComposeStep builds — always advances the root blob # (F8): a mid-child crash re-runs one sub-step, not the whole child. # `throttle: true` lets checkpoint_min_interval coalesce these mid-run # writes (default 0 = write every step); the terminal save still runs. on_step_complete: -> { checkpoint!(throttle: true) } } ) @result = nil @acquired_lock = nil @acquired_semaphore = nil @acquired_context_lock = nil @context_lock_owner = nil @contention_snooze = false @skip_context_persist = false @last_checkpoint_at = nil end |
Instance Attribute Details
#compensation_manager ⇒ Object (readonly)
Returns the value of attribute compensation_manager.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def compensation_manager @compensation_manager end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def context @context end |
#dependency_graph ⇒ Object (readonly)
Returns the value of attribute dependency_graph.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def dependency_graph @dependency_graph end |
#middlewares ⇒ Object (readonly)
Returns the value of attribute middlewares.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def middlewares @middlewares end |
#reactor_class ⇒ Object (readonly)
Returns the value of attribute reactor_class.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def reactor_class @reactor_class end |
#result ⇒ Object (readonly)
Returns the value of attribute result.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def result @result end |
#result_handler ⇒ Object (readonly)
Returns the value of attribute result_handler.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def result_handler @result_handler end |
#retry_manager ⇒ Object (readonly)
Returns the value of attribute retry_manager.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def retry_manager @retry_manager end |
#step_executor ⇒ Object (readonly)
Returns the value of attribute step_executor.
17 18 19 |
# File 'lib/ruby_reactor/executor.rb', line 17 def step_executor @step_executor end |
Class Method Details
.middlewares_for(reactor_class) ⇒ Object
81 82 83 |
# File 'lib/ruby_reactor/executor.rb', line 81 def self.middlewares_for(reactor_class) RubyReactor::MiddlewareRunner.new(resolve_middlewares(reactor_class)) end |
.resolve_middlewares(reactor_class) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ruby_reactor/executor.rb', line 61 def self.resolve_middlewares(reactor_class) global_list = Array(RubyReactor.configuration.middlewares) reactor_list = if reactor_class.respond_to?(:middlewares) Array(reactor_class.middlewares) else [] end (global_list + reactor_list).map do |mw| if mw.is_a?(Class) mw.new elsif mw.is_a?(Array) && mw.first.is_a?(Class) klass, opts = mw klass.new(**(opts || {})) else mw end end end |
Instance Method Details
#checkpoint!(throttle: false) ⇒ Object
Durable per-step checkpoint. Unlike save_context (which serializes THIS executor’s @context — the observability path, F1), checkpoint! always serializes and stores the ROOT context under the root’s key — the unit the async worker rehydrates by id. For a top-level reactor root == @context; for a composed/nested child it stores the root with the child’s live state embedded via composed_contexts. TTL is re-stamped on every write (Phase 4).
276 277 278 279 280 281 282 283 284 |
# File 'lib/ruby_reactor/executor.rb', line 276 def checkpoint!(throttle: false) return if throttle && !checkpoint_due? root = @context.root_context || @context storage = RubyReactor::Configuration.instance.storage_adapter reactor_class_name = RubyReactor.reactor_storage_name(root.reactor_class) storage.store_context(root.context_id, ContextSerializer.serialize(root), reactor_class_name) @last_checkpoint_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#checkpoint_due? ⇒ Boolean
Whether a throttled (per-step) checkpoint is due. With checkpoint_min_interval <= 0 (default) every step checkpoints; otherwise mid-run checkpoints are coalesced to at most one per interval. The first step of a run always writes (@last_checkpoint_at is nil), and the run’s terminal save is never throttled.
290 291 292 293 294 295 |
# File 'lib/ruby_reactor/executor.rb', line 290 def checkpoint_due? interval = RubyReactor.configuration.checkpoint_min_interval.to_f return true if interval <= 0 || @last_checkpoint_at.nil? (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_checkpoint_at) >= interval end |
#emit_lifecycle_completion(completed) ⇒ Object
Contention errors (lock/semaphore/rate-limit/ordered-lock wait) are expected “try again later” signals, not failures — the worker snoozes and re-runs. Emitting ‘failed_reactor` for them floods dashboards with phantom failures (one per snooze round), so route them to a distinct `snooze_reactor` event instead.
153 154 155 156 157 158 159 160 161 |
# File 'lib/ruby_reactor/executor.rb', line 153 def emit_lifecycle_completion(completed) if completed middlewares.on(:complete_reactor, reactor_class.name, @result, @context) elsif @contention_snooze middlewares.on(:snooze_reactor, reactor_class.name, $ERROR_INFO, @context) else middlewares.on(:failed_reactor, reactor_class.name, $ERROR_INFO, @context) end end |
#execute ⇒ Object
rubocop:disable Metrics/MethodLength
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/ruby_reactor/executor.rb', line 85 def execute # rubocop:disable Metrics/MethodLength middlewares.on(:start_reactor, reactor_class.name, context.inputs, @context) completed = false enter_ordered_lock_scope # short_circuit_result covers both the strict ordered-lock chain skip # and the already-marked period bucket. short = short_circuit_result if short completed = true return short_circuit!(short) end # Validate inputs BEFORE consuming a rate-limit slot or grabbing a # lock/semaphore: a run that can never start must not burn quota or # briefly block other callers. input_validator = InputValidator.new(@reactor_class, @context) input_validator.validate! acquire_locks_with_telemetry # Re-check the period gate now that we hold the lock. The pre-lock check # is a fast path; this one closes the race where two callers both passed # it and then serialized on the lock — without it the second caller would # re-run work the first already marked. (No-op when no lock is configured.) if (skipped = check_period_gate) completed = true return finalize_skipped(skipped) end @context.status = :running save_context graph_manager = GraphManager.new(@reactor_class, @dependency_graph, @context) graph_manager.build_and_validate! graph_manager.mark_completed_steps_from_context @result = @step_executor.execute_all_steps update_context_status(@result) mark_period_on_success(@result) handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult) completed = true @result rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError, RubyReactor::RateLimitRegistry::UnknownLimitError, RubyReactor::OrderedLock::WaitError => e @contention_snooze = true raise e rescue StandardError => e @result = @result_handler.handle_execution_error(e) update_context_status(@result) completed = true @result ensure release_locks leave_ordered_lock_scope save_context if persist_context? && !skip_context_persist? emit_lifecycle_completion(completed) end |
#execution_trace ⇒ Object
257 258 259 |
# File 'lib/ruby_reactor/executor.rb', line 257 def execution_trace @context.execution_trace end |
#persist_context? ⇒ Boolean
297 298 299 300 301 |
# File 'lib/ruby_reactor/executor.rb', line 297 def persist_context? @context.status.to_s != "pending" || @context.execution_trace.any? || @context.intermediate_results.any? end |
#resume_execution ⇒ Object
rubocop:disable Metrics/MethodLength,Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/ruby_reactor/executor.rb', line 163 def resume_execution # rubocop:disable Metrics/MethodLength,Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity middlewares.on(:start_reactor, reactor_class.name, context.inputs, @context) completed = false # A fresh async reactor run reaches the worker through resume_execution # (it never calls execute), so the period and rate-limit gates that live # in execute must be applied here too. Genuine resumes (a step already ran # or we paused mid-flight, so current_step is set) must NOT re-gate: a # paused reactor must not throttle or skip itself on the way back in. first_run = first_execution? enter_ordered_lock_scope # ordered-lock skip applies on any run; the period gate only on a fresh # first run (a genuine resume must not skip itself when its own marker # eventually lands). short = ordered_lock_short_circuit short ||= check_period_gate if first_run if short completed = true return short_circuit!(short) end @context.status = :running check_rate_limit if first_run # Per-context liveness lock: serializes duplicate deliveries of the same # root context (e.g. a sweeper re-enqueue racing a still-live worker) and # doubles as the sweeper's "worker alive" signal. Only the ROOT executor # holds it — composed/nested children resume inline under the root worker # and must not contend on the root's own key. acquire_context_lock # Resumes intentionally skip check_rate_limit (a paused run must not # block itself on resume), so acquire lock/semaphore directly rather # than via acquire_locks. acquire_exclusive_lock if @reactor_class.respond_to?(:lock_config) && @reactor_class.lock_config acquire_semaphore if @reactor_class.respond_to?(:semaphore_config) && @reactor_class.semaphore_config # Post-lock re-check (see execute) — closes the period race for the # first run of a locked async reactor. if first_run && (skipped = check_period_gate) completed = true return finalize_skipped(skipped) end prepare_for_resume save_context @result = if @context.current_step execute_current_step_and_continue else execute_remaining_steps end update_context_status(@result) mark_period_on_success(@result) handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult) completed = true @result rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError, RubyReactor::RateLimitRegistry::UnknownLimitError, RubyReactor::OrderedLock::WaitError => e @contention_snooze = true raise e rescue StandardError => e handle_resume_error(e) update_context_status(@result) completed = true @result ensure release_locks @acquired_context_lock&.release @acquired_context_lock = nil leave_ordered_lock_scope save_context unless skip_context_persist? emit_lifecycle_completion(completed) end |
#save_context ⇒ Object
261 262 263 264 265 266 267 268 |
# File 'lib/ruby_reactor/executor.rb', line 261 def save_context storage = RubyReactor::Configuration.instance.storage_adapter reactor_class_name = RubyReactor.reactor_storage_name(@reactor_class) # Serialize context serialized_context = ContextSerializer.serialize(@context) storage.store_context(@context.context_id, serialized_context, reactor_class_name) end |
#undo_all ⇒ Object
245 246 247 |
# File 'lib/ruby_reactor/executor.rb', line 245 def undo_all @compensation_manager.rollback_completed_steps end |
#undo_stack ⇒ Object
249 250 251 |
# File 'lib/ruby_reactor/executor.rb', line 249 def undo_stack @compensation_manager.undo_stack end |
#undo_trace ⇒ Object
253 254 255 |
# File 'lib/ruby_reactor/executor.rb', line 253 def undo_trace @compensation_manager.undo_trace end |