Class: RubyReactor::Map::ElementExecutor
- Inherits:
-
Object
- Object
- RubyReactor::Map::ElementExecutor
- Extended by:
- Helpers
- Defined in:
- lib/ruby_reactor/map/element_executor.rb
Class Method Summary collapse
-
.check_fail_fast?(arguments, storage) ⇒ Boolean
rubocop:enable Style/IdenticalConditionalBranches.
- .finalize_execution(arguments, storage) ⇒ Object
- .handle_result(result, arguments, context, storage, executor) ⇒ Object
-
.hydrate_or_create_context(arguments) ⇒ Object
rubocop:disable Style/IdenticalConditionalBranches.
- .perform(arguments) ⇒ Object
Methods included from Helpers
apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution
Class Method Details
.check_fail_fast?(arguments, storage) ⇒ Boolean
rubocop:enable Style/IdenticalConditionalBranches
84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 84 def self.check_fail_fast?(arguments, storage) return false unless arguments[:fail_fast] map_id = arguments[:map_id] parent_reactor_class_name = arguments[:parent_reactor_class_name] failed_context_id = storage.retrieve_map_failed_context_id(map_id, parent_reactor_class_name) return false unless failed_context_id # Skip execution finalize_execution(arguments, storage) true end |
.finalize_execution(arguments, storage) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 128 def self.finalize_execution(arguments, storage) map_id = arguments[:map_id] parent_class = arguments[:parent_reactor_class_name] new_count = storage.decrement_map_counter(map_id, parent_class) trigger_next_batch_if_needed(arguments, arguments[:index], arguments[:batch_size]) return unless new_count.zero? RubyReactor.configuration.async_router.perform_map_collection_async( parent_context_id: arguments[:parent_context_id], map_id: map_id, parent_reactor_class_name: parent_class, step_name: arguments[:step_name], strict_ordering: arguments[:strict_ordering], timeout: 3600 ) end |
.handle_result(result, arguments, context, storage, executor) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 98 def self.handle_result(result, arguments, context, storage, executor) return if result.is_a?(RetryQueuedResult) map_id = arguments[:map_id] index = arguments[:index] parent_class = arguments[:parent_reactor_class_name] # Using short name for variable if result.success? storage.store_map_result(map_id, index, ContextSerializer.serialize_value(result.value), parent_class, strict_ordering: arguments[:strict_ordering]) else executor.undo_all storage.store_map_result(map_id, index, { _error: result.error }, parent_class, strict_ordering: arguments[:strict_ordering]) if arguments[:fail_fast] storage.store_map_failed_context_id(map_id, context.context_id, parent_class) # FAST FAIL: Trigger Collector immediately to cancel/fail the map execution RubyReactor.configuration.async_router.perform_map_collection_async( parent_context_id: arguments[:parent_context_id], map_id: map_id, parent_reactor_class_name: parent_class, step_name: arguments[:step_name], strict_ordering: arguments[:strict_ordering], timeout: 3600 ) end end end |
.hydrate_or_create_context(arguments) ⇒ Object
rubocop:disable Style/IdenticalConditionalBranches
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 63 def self.hydrate_or_create_context(arguments) if arguments[:serialized_context] context = ContextSerializer.deserialize(arguments[:serialized_context]) context. = arguments if context.inputs.empty? && arguments[:serialized_inputs] context.inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs]) end context else inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs]) reactor_class = resolve_reactor_class(arguments[:reactor_class_info]) context = Context.new(inputs, reactor_class) context.parent_context_id = arguments[:parent_context_id] context. = arguments context end end |
.perform(arguments) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 8 def self.perform(arguments) arguments = arguments.transform_keys(&:to_sym) context = hydrate_or_create_context(arguments) # The element already runs inside its own background worker, so any async # steps (and async retries) must execute inline here rather than handing # off to a detached Worker that would escape map result/counter tracking. # This mirrors SidekiqWorkers::Worker, which sets the same flag. context.inline_async_execution = true storage = RubyReactor.configuration.storage_adapter storage.store_map_element_context_id(arguments[:map_id], context.context_id, arguments[:parent_reactor_class_name]) return if check_fail_fast?(arguments, storage) executor = Executor.new(context.reactor_class, {}, context) arguments[:serialized_context] ? executor.resume_execution : executor.execute result = executor.result # An async retry requeued this element as a fresh MapElementWorker job, so # it is not finished yet. Do not store a result, decrement the completion # counter, or trigger the next batch — the requeued job will do that when # the element ultimately succeeds or exhausts its retries. return if result.is_a?(RetryQueuedResult) handle_result(result, arguments, context, storage, executor) finalize_execution(arguments, storage) end |