Class: RubyReactor::Executor
- Inherits:
-
Object
- Object
- RubyReactor::Executor
- Defined in:
- lib/ruby_reactor/executor.rb,
lib/ruby_reactor/executor/graph_manager.rb,
lib/ruby_reactor/executor/retry_manager.rb,
lib/ruby_reactor/executor/step_executor.rb,
lib/ruby_reactor/executor/result_handler.rb,
lib/ruby_reactor/executor/input_validator.rb,
lib/ruby_reactor/executor/compensation_manager.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Classes: CompensationManager, GraphManager, InputValidator, ResultHandler, RetryManager, StepExecutor
Instance Attribute Summary collapse
-
#compensation_manager ⇒ Object
readonly
Returns the value of attribute compensation_manager.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#dependency_graph ⇒ Object
readonly
Returns the value of attribute dependency_graph.
-
#middlewares ⇒ Object
readonly
Returns the value of attribute middlewares.
-
#reactor_class ⇒ Object
readonly
Returns the value of attribute reactor_class.
-
#result ⇒ Object
readonly
Returns the value of attribute result.
-
#result_handler ⇒ Object
readonly
Returns the value of attribute result_handler.
-
#retry_manager ⇒ Object
readonly
Returns the value of attribute retry_manager.
-
#step_executor ⇒ Object
readonly
Returns the value of attribute step_executor.
Class Method Summary collapse
Instance Method Summary collapse
-
#execute ⇒ Object
rubocop:disable Metrics/MethodLength.
- #execution_trace ⇒ Object
-
#initialize(reactor_class, inputs = {}, context = nil) ⇒ Executor
constructor
A new instance of Executor.
- #persist_context? ⇒ Boolean
- #resume_execution ⇒ Object
- #save_context ⇒ Object
- #undo_all ⇒ Object
- #undo_stack ⇒ Object
- #undo_trace ⇒ Object
Constructor Details
#initialize(reactor_class, inputs = {}, context = nil) ⇒ Executor
Returns a new instance of Executor.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/ruby_reactor/executor.rb', line 17 def initialize(reactor_class, inputs = {}, context = nil) @reactor_class = reactor_class @context = context || Context.new(inputs, reactor_class) @middlewares = Executor.middlewares_for(reactor_class) @context.middlewares = @middlewares @dependency_graph = DependencyGraph.new @compensation_manager = CompensationManager.new(@context) @retry_manager = RetryManager.new(@context, @middlewares) @result_handler = ResultHandler.new( context: @context, compensation_manager: @compensation_manager, dependency_graph: @dependency_graph ) @step_executor = StepExecutor.new( context: @context, dependency_graph: @dependency_graph, reactor_class: @reactor_class, managers: { retry_manager: @retry_manager, result_handler: @result_handler, compensation_manager: @compensation_manager, middlewares: @middlewares } ) @result = nil @acquired_lock = nil @acquired_semaphore = nil end |
Instance Attribute Details
#compensation_manager ⇒ Object (readonly)
Returns the value of attribute compensation_manager.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def compensation_manager @compensation_manager end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def context @context end |
#dependency_graph ⇒ Object (readonly)
Returns the value of attribute dependency_graph.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def dependency_graph @dependency_graph end |
#middlewares ⇒ Object (readonly)
Returns the value of attribute middlewares.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def middlewares @middlewares end |
#reactor_class ⇒ Object (readonly)
Returns the value of attribute reactor_class.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def reactor_class @reactor_class end |
#result ⇒ Object (readonly)
Returns the value of attribute result.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def result @result end |
#result_handler ⇒ Object (readonly)
Returns the value of attribute result_handler.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def result_handler @result_handler end |
#retry_manager ⇒ Object (readonly)
Returns the value of attribute retry_manager.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def retry_manager @retry_manager end |
#step_executor ⇒ Object (readonly)
Returns the value of attribute step_executor.
14 15 16 |
# File 'lib/ruby_reactor/executor.rb', line 14 def step_executor @step_executor end |
Class Method Details
.middlewares_for(reactor_class) ⇒ Object
66 67 68 |
# File 'lib/ruby_reactor/executor.rb', line 66 def self.middlewares_for(reactor_class) RubyReactor::MiddlewareRunner.new(resolve_middlewares(reactor_class)) end |
.resolve_middlewares(reactor_class) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/ruby_reactor/executor.rb', line 46 def self.resolve_middlewares(reactor_class) global_list = Array(RubyReactor.configuration.middlewares) reactor_list = if reactor_class.respond_to?(:middlewares) Array(reactor_class.middlewares) else [] end (global_list + reactor_list).map do |mw| if mw.is_a?(Class) mw.new elsif mw.is_a?(Array) && mw.first.is_a?(Class) klass, opts = mw klass.new(**(opts || {})) else mw end end end |
Instance Method Details
#execute ⇒ Object
rubocop:disable Metrics/MethodLength
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/ruby_reactor/executor.rb', line 70 def execute # rubocop:disable Metrics/MethodLength middlewares.on(:start_reactor, reactor_class.name, context.inputs, @context) completed = false skipped = check_period_gate if skipped @result = skipped update_context_status(@result) save_context completed = true return @result end acquire_locks_with_telemetry input_validator = InputValidator.new(@reactor_class, @context) input_validator.validate! @context.status = :running save_context graph_manager = GraphManager.new(@reactor_class, @dependency_graph, @context) graph_manager.build_and_validate! graph_manager.mark_completed_steps_from_context @result = @step_executor.execute_all_steps update_context_status(@result) mark_period_on_success(@result) handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult) completed = true @result rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError => e raise e rescue StandardError => e @result = @result_handler.handle_execution_error(e) update_context_status(@result) completed = true @result ensure release_locks save_context if persist_context? if completed middlewares.on(:complete_reactor, reactor_class.name, @result, @context) else middlewares.on(:failed_reactor, reactor_class.name, $ERROR_INFO, @context) end end |
#execution_trace ⇒ Object
176 177 178 |
# File 'lib/ruby_reactor/executor.rb', line 176 def execution_trace @context.execution_trace end |
#persist_context? ⇒ Boolean
189 190 191 192 193 |
# File 'lib/ruby_reactor/executor.rb', line 189 def persist_context? @context.status.to_s != "pending" || @context.execution_trace.any? || @context.intermediate_results.any? end |
#resume_execution ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/ruby_reactor/executor.rb', line 121 def resume_execution middlewares.on(:start_reactor, reactor_class.name, context.inputs, @context) completed = false begin @context.status = :running acquire_exclusive_lock if @reactor_class.respond_to?(:lock_config) && @reactor_class.lock_config acquire_semaphore if @reactor_class.respond_to?(:semaphore_config) && @reactor_class.semaphore_config prepare_for_resume save_context @result = if @context.current_step execute_current_step_and_continue else execute_remaining_steps end update_context_status(@result) mark_period_on_success(@result) handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult) completed = true @result rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError raise rescue StandardError => e handle_resume_error(e) update_context_status(@result) completed = true @result ensure release_locks save_context if completed middlewares.on(:complete_reactor, reactor_class.name, @result, @context) else middlewares.on(:failed_reactor, reactor_class.name, $ERROR_INFO, @context) end end end |
#save_context ⇒ Object
180 181 182 183 184 185 186 187 |
# File 'lib/ruby_reactor/executor.rb', line 180 def save_context storage = RubyReactor::Configuration.instance.storage_adapter reactor_class_name = @reactor_class.name || "AnonymousReactor-#{@reactor_class.object_id}" # Serialize context serialized_context = ContextSerializer.serialize(@context) storage.store_context(@context.context_id, serialized_context, reactor_class_name) end |
#undo_all ⇒ Object
164 165 166 |
# File 'lib/ruby_reactor/executor.rb', line 164 def undo_all @compensation_manager.rollback_completed_steps end |
#undo_stack ⇒ Object
168 169 170 |
# File 'lib/ruby_reactor/executor.rb', line 168 def undo_stack @compensation_manager.undo_stack end |
#undo_trace ⇒ Object
172 173 174 |
# File 'lib/ruby_reactor/executor.rb', line 172 def undo_trace @compensation_manager.undo_trace end |