Class: RubyReactor::Map::ElementExecutor
- Inherits:
-
Object
- Object
- RubyReactor::Map::ElementExecutor
- Extended by:
- Helpers
- Defined in:
- lib/ruby_reactor/map/element_executor.rb
Class Method Summary collapse
- .acquire_element_lock(arguments) ⇒ Object
-
.check_fail_fast?(arguments, storage) ⇒ Boolean
rubocop:enable Style/IdenticalConditionalBranches.
- .finalize_execution(arguments, storage) ⇒ Object
- .handle_result(result, arguments, context, storage, executor) ⇒ Object
-
.hydrate_or_create_context(arguments) ⇒ Object
rubocop:disable Style/IdenticalConditionalBranches.
- .inline_testing_mode? ⇒ Boolean
- .perform(arguments) ⇒ Object
- .perform_element(arguments) ⇒ Object
Methods included from Helpers
apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution
Class Method Details
.acquire_element_lock(arguments) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 25 def self.acquire_element_lock(arguments) # In Sidekiq::Testing.inline! an element's async-retry perform_map_element_in # re-enters synchronously inside this frame; the lock would self-contend. # It only guards concurrent cross-process delivery, impossible inline. return :inline if inline_testing_mode? lock = RubyReactor::Lock.new( "map_element:#{arguments[:map_id]}:#{arguments[:index]}", owner: SecureRandom.uuid, ttl: RubyReactor.configuration.context_lock_ttl, wait: 0, auto_extend: true ) lock.acquire lock rescue RubyReactor::Lock::AcquisitionError RubyReactor.configuration.logger.info( "RubyReactor map element #{arguments[:map_id]}:#{arguments[:index]} already in flight; dropping duplicate" ) :contended end |
.check_fail_fast?(arguments, storage) ⇒ Boolean
rubocop:enable Style/IdenticalConditionalBranches
123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 123 def self.check_fail_fast?(arguments, storage) return false unless arguments[:fail_fast] map_id = arguments[:map_id] parent_reactor_class_name = arguments[:parent_reactor_class_name] failed_context_id = storage.retrieve_map_failed_context_id(map_id, parent_reactor_class_name) return false unless failed_context_id # Skip execution finalize_execution(arguments, storage) true end |
.finalize_execution(arguments, storage) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 167 def self.finalize_execution(arguments, storage) map_id = arguments[:map_id] parent_class = arguments[:parent_reactor_class_name] new_count = storage.decrement_map_counter(map_id, parent_class) trigger_next_batch_if_needed(arguments, arguments[:index], arguments[:batch_size]) return unless new_count.zero? RubyReactor.configuration.async_router.perform_map_collection_async( parent_context_id: arguments[:parent_context_id], map_id: map_id, parent_reactor_class_name: parent_class, step_name: arguments[:step_name], strict_ordering: arguments[:strict_ordering], timeout: 3600 ) end |
.handle_result(result, arguments, context, storage, executor) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 137 def self.handle_result(result, arguments, context, storage, executor) return if result.is_a?(RetryQueuedResult) map_id = arguments[:map_id] index = arguments[:index] parent_class = arguments[:parent_reactor_class_name] # Using short name for variable if result.success? storage.store_map_result(map_id, index, ContextSerializer.serialize_value(result.value), parent_class, strict_ordering: arguments[:strict_ordering]) else executor.undo_all storage.store_map_result(map_id, index, { _error: result.error }, parent_class, strict_ordering: arguments[:strict_ordering]) if arguments[:fail_fast] storage.store_map_failed_context_id(map_id, context.context_id, parent_class) # FAST FAIL: Trigger Collector immediately to cancel/fail the map execution RubyReactor.configuration.async_router.perform_map_collection_async( parent_context_id: arguments[:parent_context_id], map_id: map_id, parent_reactor_class_name: parent_class, step_name: arguments[:step_name], strict_ordering: arguments[:strict_ordering], timeout: 3600 ) end end end |
.hydrate_or_create_context(arguments) ⇒ Object
rubocop:disable Style/IdenticalConditionalBranches
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 102 def self.hydrate_or_create_context(arguments) if arguments[:serialized_context] context = ContextSerializer.deserialize(arguments[:serialized_context]) context. = arguments if context.inputs.empty? && arguments[:serialized_inputs] context.inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs]) end context else inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs]) reactor_class = resolve_reactor_class(arguments[:reactor_class_info]) context = Context.new(inputs, reactor_class) context.parent_context_id = arguments[:parent_context_id] context. = arguments context end end |
.inline_testing_mode? ⇒ Boolean
45 46 47 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 45 def self.inline_testing_mode? defined?(Sidekiq::Testing) && Sidekiq::Testing.respond_to?(:inline?) && Sidekiq::Testing.inline? end |
.perform(arguments) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 8 def self.perform(arguments) arguments = arguments.transform_keys(&:to_sym) # Per-element liveness lock (Phase 5b): its presence is the map sweeper's # "element alive" signal, and it serializes duplicate deliveries so a # re-run can't double-decrement the counter (M3). A duplicate of a live # element is dropped — the live original stores the result and finalizes. lock = acquire_element_lock(arguments) return if lock == :contended begin perform_element(arguments) ensure lock.release if lock.respond_to?(:release) end end |
.perform_element(arguments) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 49 def self.perform_element(arguments) context = hydrate_or_create_context(arguments) # The element already runs inside its own background worker, so any async # steps (and async retries) must execute inline here rather than handing # off to a detached Worker that would escape map result/counter tracking. # This mirrors SidekiqWorkers::Worker, which sets the same flag. context.inline_async_execution = true storage = RubyReactor.configuration.storage_adapter storage.store_map_element_context_id(arguments[:map_id], context.context_id, arguments[:parent_reactor_class_name]) return if check_fail_fast?(arguments, storage) executor = Executor.new(context.reactor_class, {}, context) arguments[:serialized_context] ? executor.resume_execution : executor.execute result = executor.result # An async retry requeued this element as a fresh MapElementWorker job, so # it is not finished yet. Do not store a result, decrement the completion # counter, or trigger the next batch — the requeued job will do that when # the element ultimately succeeds or exhausts its retries. return if result.is_a?(RetryQueuedResult) handle_result(result, arguments, context, storage, executor) finalize_execution(arguments, storage) end |