Module: RubyReactor::Map::Helpers

Included in:
Collector, Dispatcher, ElementExecutor, Execution
Defined in:
lib/ruby_reactor/map/helpers.rb

Overview

Shared helper methods for Map executors

Instance Method Summary collapse

Instance Method Details

#apply_collect_block(results, step_config) ⇒ Object

Applies collect block to results



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/ruby_reactor/map/helpers.rb', line 36

def apply_collect_block(results, step_config)
  collect_block = step_config.arguments[:collect_block][:source].value

  if collect_block
    # Pass all results (Success and Failure) to collect block
    begin
      collected = collect_block.call(results)
      RubyReactor::Success(collected)
    rescue StandardError => e
      RubyReactor::Failure(e)
    end
  else
    # Default behavior: fail if any failure
    first_failure = results.find(&:failure?)
    first_failure || RubyReactor::Success(results.map(&:value))
  end
end

#build_element_inputs(mappings, parent_context, element) ⇒ Object

Builds mapped inputs for a single element



31
32
33
# File 'lib/ruby_reactor/map/helpers.rb', line 31

def build_element_inputs(mappings, parent_context, element)
  RubyReactor::Step::MapStep.build_mapped_inputs(mappings, parent_context, element)
end

#load_parent_context_from_storage(parent_context_id, reactor_class_name, storage) ⇒ Object

Loads parent context from storage



25
26
27
28
# File 'lib/ruby_reactor/map/helpers.rb', line 25

def load_parent_context_from_storage(parent_context_id, reactor_class_name, storage)
  parent_context_data = storage.retrieve_context(parent_context_id, reactor_class_name)
  RubyReactor::Context.deserialize_from_retry(parent_context_data)
end

#resolve_reactor_class(info) ⇒ Object

Resolves the reactor class from reactor_class_info



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/ruby_reactor/map/helpers.rb', line 8

def resolve_reactor_class(info)
  if info["type"] == "class"
    begin
      Object.const_get(info["name"])
    rescue NameError
      RubyReactor::Registry.find(info["name"])
    end
  elsif info["type"] == "inline"
    parent_class = Object.const_get(info["parent"])
    step_config = parent_class.steps[info["step"].to_sym]
    step_config.arguments[:mapped_reactor_class][:source].value
  else
    raise "Unknown reactor class info: #{info}"
  end
end

#resume_parent_execution(parent_context, step_name, final_result, storage) ⇒ Object

Resumes parent reactor execution after map completion



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/ruby_reactor/map/helpers.rb', line 55

def resume_parent_execution(parent_context, step_name, final_result, storage)
  executor = RubyReactor::Executor.new(parent_context.reactor_class, {}, parent_context)
  step_name_sym = step_name.to_sym

  if final_result.failure?
    parent_context.current_step = step_name_sym

    error = RubyReactor::Error::StepFailureError.new(
      final_result.error,
      step: step_name_sym,
      context: parent_context,
      original_error: final_result.error.is_a?(Exception) ? final_result.error : nil,
      exception_class: final_result.respond_to?(:exception_class) ? final_result.exception_class : nil
    )

    # Pass backtrace if available
    if final_result.respond_to?(:backtrace) && final_result.backtrace
      error.set_backtrace(final_result.backtrace)
    elsif final_result.error.respond_to?(:backtrace)
      error.set_backtrace(final_result.error.backtrace)
    end

    failure_response = executor.result_handler.handle_execution_error(error)
    # Manually update context status since we're not running executor loop
    executor.send(:update_context_status, failure_response)
  else
    parent_context.set_result(step_name_sym, final_result.value)

    # Manually update execution trace to reflect completion
    # This is necessary because resume_execution continues from the NEXT step
    # and the async step (which returned AsyncResult) needs to be marked as done with actual value
    parent_context.execution_trace << {
      type: :result,
      step: step_name_sym,
      timestamp: Time.now,
      value: final_result.value,
      status: :success
    }

    parent_context.current_step = nil
    executor.resume_execution
  end

  storage.store_context(
    parent_context.context_id,
    ContextSerializer.serialize(parent_context),
    parent_context.reactor_class.name
  )
end