Module: RubyReactor::Map::Helpers
- Included in:
- Collector, Dispatcher, ElementExecutor, Execution
- Defined in:
- lib/ruby_reactor/map/helpers.rb
Overview
Shared helper methods for Map executors
Instance Method Summary collapse
-
#apply_collect_block(results, step_config) ⇒ Object
Applies collect block to results.
-
#build_element_inputs(mappings, parent_context, element) ⇒ Object
Builds mapped inputs for a single element.
-
#load_parent_context_from_storage(parent_context_id, reactor_class_name, storage) ⇒ Object
Loads parent context from storage.
-
#resolve_reactor_class(info) ⇒ Object
Resolves the reactor class from reactor_class_info.
-
#resume_parent_execution(parent_context, step_name, final_result, storage) ⇒ Object
Resumes parent reactor execution after map completion.
Instance Method Details
#apply_collect_block(results, step_config) ⇒ Object
Applies collect block to results
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/ruby_reactor/map/helpers.rb', line 36 def apply_collect_block(results, step_config) collect_block = step_config.arguments[:collect_block][:source].value if collect_block # Pass all results (Success and Failure) to collect block begin collected = collect_block.call(results) RubyReactor::Success(collected) rescue StandardError => e RubyReactor::Failure(e) end else # Default behavior: fail if any failure first_failure = results.find(&:failure?) first_failure || RubyReactor::Success(results.map(&:value)) end end |
#build_element_inputs(mappings, parent_context, element) ⇒ Object
Builds mapped inputs for a single element
31 32 33 |
# File 'lib/ruby_reactor/map/helpers.rb', line 31 def build_element_inputs(mappings, parent_context, element) RubyReactor::Step::MapStep.build_mapped_inputs(mappings, parent_context, element) end |
#load_parent_context_from_storage(parent_context_id, reactor_class_name, storage) ⇒ Object
Loads parent context from storage
25 26 27 28 |
# File 'lib/ruby_reactor/map/helpers.rb', line 25 def load_parent_context_from_storage(parent_context_id, reactor_class_name, storage) parent_context_data = storage.retrieve_context(parent_context_id, reactor_class_name) RubyReactor::Context.deserialize_from_retry(parent_context_data) end |
#resolve_reactor_class(info) ⇒ Object
Resolves the reactor class from reactor_class_info
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/ruby_reactor/map/helpers.rb', line 8 def resolve_reactor_class(info) if info["type"] == "class" begin Object.const_get(info["name"]) rescue NameError RubyReactor::Registry.find(info["name"]) end elsif info["type"] == "inline" parent_class = Object.const_get(info["parent"]) step_config = parent_class.steps[info["step"].to_sym] step_config.arguments[:mapped_reactor_class][:source].value else raise "Unknown reactor class info: #{info}" end end |
#resume_parent_execution(parent_context, step_name, final_result, storage) ⇒ Object
Resumes parent reactor execution after map completion
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/ruby_reactor/map/helpers.rb', line 55 def resume_parent_execution(parent_context, step_name, final_result, storage) executor = RubyReactor::Executor.new(parent_context.reactor_class, {}, parent_context) step_name_sym = step_name.to_sym if final_result.failure? parent_context.current_step = step_name_sym error = RubyReactor::Error::StepFailureError.new( final_result.error, step: step_name_sym, context: parent_context, original_error: final_result.error.is_a?(Exception) ? final_result.error : nil, exception_class: final_result.respond_to?(:exception_class) ? final_result.exception_class : nil ) # Pass backtrace if available if final_result.respond_to?(:backtrace) && final_result.backtrace error.set_backtrace(final_result.backtrace) elsif final_result.error.respond_to?(:backtrace) error.set_backtrace(final_result.error.backtrace) end failure_response = executor.result_handler.handle_execution_error(error) # Manually update context status since we're not running executor loop executor.send(:update_context_status, failure_response) else parent_context.set_result(step_name_sym, final_result.value) # Manually update execution trace to reflect completion # This is necessary because resume_execution continues from the NEXT step # and the async step (which returned AsyncResult) needs to be marked as done with actual value parent_context.execution_trace << { type: :result, step: step_name_sym, timestamp: Time.now, value: final_result.value, status: :success } parent_context.current_step = nil executor.resume_execution end storage.store_context( parent_context.context_id, ContextSerializer.serialize(parent_context), parent_context.reactor_class.name ) end |