Class: RubyReactor::Map::ElementExecutor

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Defined in:
lib/ruby_reactor/map/element_executor.rb

Class Method Summary collapse

Methods included from Helpers

apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution

Class Method Details

.check_fail_fast?(arguments, storage) ⇒ Boolean

rubocop:enable Style/IdenticalConditionalBranches

Returns:

  • (Boolean)


70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/ruby_reactor/map/element_executor.rb', line 70

def self.check_fail_fast?(arguments, storage)
  return false unless arguments[:fail_fast]

  map_id = arguments[:map_id]
  parent_reactor_class_name = arguments[:parent_reactor_class_name]

  failed_context_id = storage.retrieve_map_failed_context_id(map_id, parent_reactor_class_name)
  return false unless failed_context_id

  # Skip execution
  finalize_execution(arguments, storage)
  true
end

.finalize_execution(arguments, storage) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/ruby_reactor/map/element_executor.rb', line 114

def self.finalize_execution(arguments, storage)
  map_id = arguments[:map_id]
  parent_class = arguments[:parent_reactor_class_name]

  new_count = storage.decrement_map_counter(map_id, parent_class)
  trigger_next_batch_if_needed(arguments, arguments[:index], arguments[:batch_size])

  return unless new_count.zero?

  RubyReactor.configuration.async_router.perform_map_collection_async(
    parent_context_id: arguments[:parent_context_id],
    map_id: map_id,
    parent_reactor_class_name: parent_class,
    step_name: arguments[:step_name],
    strict_ordering: arguments[:strict_ordering],
    timeout: 3600
  )
end

.handle_result(result, arguments, context, storage, executor) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/ruby_reactor/map/element_executor.rb', line 84

def self.handle_result(result, arguments, context, storage, executor)
  return if result.is_a?(RetryQueuedResult)

  map_id = arguments[:map_id]
  index = arguments[:index]
  parent_class = arguments[:parent_reactor_class_name] # Using short name for variable

  if result.success?
    storage.store_map_result(map_id, index, ContextSerializer.serialize_value(result.value),
                             parent_class, strict_ordering: arguments[:strict_ordering])
  else
    executor.undo_all
    storage.store_map_result(map_id, index, { _error: result.error }, parent_class,
                             strict_ordering: arguments[:strict_ordering])

    if arguments[:fail_fast]
      storage.store_map_failed_context_id(map_id, context.context_id, parent_class)
      # FAST FAIL: Trigger Collector immediately to cancel/fail the map execution
      RubyReactor.configuration.async_router.perform_map_collection_async(
        parent_context_id: arguments[:parent_context_id],
        map_id: map_id,
        parent_reactor_class_name: parent_class,
        step_name: arguments[:step_name],
        strict_ordering: arguments[:strict_ordering],
        timeout: 3600
      )
    end
  end
end

.hydrate_or_create_context(arguments) ⇒ Object

rubocop:disable Style/IdenticalConditionalBranches



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/ruby_reactor/map/element_executor.rb', line 49

def self.hydrate_or_create_context(arguments)
  if arguments[:serialized_context]
    context = ContextSerializer.deserialize(arguments[:serialized_context])
    context. = arguments

    if context.inputs.empty? && arguments[:serialized_inputs]
      context.inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs])
    end
    context
  else
    inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs])
    reactor_class = resolve_reactor_class(arguments[:reactor_class_info])

    context = Context.new(inputs, reactor_class)
    context.parent_context_id = arguments[:parent_context_id]
    context. = arguments
    context
  end
end

.perform(arguments) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/ruby_reactor/map/element_executor.rb', line 8

def self.perform(arguments)
  arguments = arguments.transform_keys(&:to_sym)

  context = hydrate_or_create_context(arguments)
  storage = RubyReactor.configuration.storage_adapter
  storage.store_map_element_context_id(arguments[:map_id], context.context_id,
                                       arguments[:parent_reactor_class_name])

  return if check_fail_fast?(arguments, storage)

  executor = Executor.new(context.reactor_class, {}, context)
  arguments[:serialized_context] ? executor.resume_execution : executor.execute

  handle_result(executor.result, arguments, context, storage, executor)
  finalize_execution(arguments, storage)
end