Class: RubyReactor::Map::ElementExecutor

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Defined in:
lib/ruby_reactor/map/element_executor.rb

Class Method Summary collapse

Methods included from Helpers

apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution

Class Method Details

.acquire_element_lock(arguments) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/ruby_reactor/map/element_executor.rb', line 25

def self.acquire_element_lock(arguments)
  # In Sidekiq::Testing.inline! an element's async-retry perform_map_element_in
  # re-enters synchronously inside this frame; the lock would self-contend.
  # It only guards concurrent cross-process delivery, impossible inline.
  return :inline if inline_testing_mode?

  lock = RubyReactor::Lock.new(
    "map_element:#{arguments[:map_id]}:#{arguments[:index]}",
    owner: SecureRandom.uuid, ttl: RubyReactor.configuration.context_lock_ttl,
    wait: 0, auto_extend: true
  )
  lock.acquire
  lock
rescue RubyReactor::Lock::AcquisitionError
  RubyReactor.configuration.logger.info(
    "RubyReactor map element #{arguments[:map_id]}:#{arguments[:index]} already in flight; dropping duplicate"
  )
  :contended
end

.check_fail_fast?(arguments, storage) ⇒ Boolean

rubocop:enable Style/IdenticalConditionalBranches

Returns:

  • (Boolean)


123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/ruby_reactor/map/element_executor.rb', line 123

def self.check_fail_fast?(arguments, storage)
  return false unless arguments[:fail_fast]

  map_id = arguments[:map_id]
  parent_reactor_class_name = arguments[:parent_reactor_class_name]

  failed_context_id = storage.retrieve_map_failed_context_id(map_id, parent_reactor_class_name)
  return false unless failed_context_id

  # Skip execution
  finalize_execution(arguments, storage)
  true
end

.finalize_execution(arguments, storage) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/ruby_reactor/map/element_executor.rb', line 167

def self.finalize_execution(arguments, storage)
  map_id = arguments[:map_id]
  parent_class = arguments[:parent_reactor_class_name]

  new_count = storage.decrement_map_counter(map_id, parent_class)
  trigger_next_batch_if_needed(arguments, arguments[:index], arguments[:batch_size])

  return unless new_count.zero?

  RubyReactor.configuration.async_router.perform_map_collection_async(
    parent_context_id: arguments[:parent_context_id],
    map_id: map_id,
    parent_reactor_class_name: parent_class,
    step_name: arguments[:step_name],
    strict_ordering: arguments[:strict_ordering],
    timeout: 3600
  )
end

.handle_result(result, arguments, context, storage, executor) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/ruby_reactor/map/element_executor.rb', line 137

def self.handle_result(result, arguments, context, storage, executor)
  return if result.is_a?(RetryQueuedResult)

  map_id = arguments[:map_id]
  index = arguments[:index]
  parent_class = arguments[:parent_reactor_class_name] # Using short name for variable

  if result.success?
    storage.store_map_result(map_id, index, ContextSerializer.serialize_value(result.value),
                             parent_class, strict_ordering: arguments[:strict_ordering])
  else
    executor.undo_all
    storage.store_map_result(map_id, index, { _error: result.error }, parent_class,
                             strict_ordering: arguments[:strict_ordering])

    if arguments[:fail_fast]
      storage.store_map_failed_context_id(map_id, context.context_id, parent_class)
      # FAST FAIL: Trigger Collector immediately to cancel/fail the map execution
      RubyReactor.configuration.async_router.perform_map_collection_async(
        parent_context_id: arguments[:parent_context_id],
        map_id: map_id,
        parent_reactor_class_name: parent_class,
        step_name: arguments[:step_name],
        strict_ordering: arguments[:strict_ordering],
        timeout: 3600
      )
    end
  end
end

.hydrate_or_create_context(arguments) ⇒ Object

rubocop:disable Style/IdenticalConditionalBranches



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/ruby_reactor/map/element_executor.rb', line 102

def self.hydrate_or_create_context(arguments)
  if arguments[:serialized_context]
    context = ContextSerializer.deserialize(arguments[:serialized_context])
    context. = arguments

    if context.inputs.empty? && arguments[:serialized_inputs]
      context.inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs])
    end
    context
  else
    inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs])
    reactor_class = resolve_reactor_class(arguments[:reactor_class_info])

    context = Context.new(inputs, reactor_class)
    context.parent_context_id = arguments[:parent_context_id]
    context. = arguments
    context
  end
end

.inline_testing_mode?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/ruby_reactor/map/element_executor.rb', line 45

def self.inline_testing_mode?
  defined?(Sidekiq::Testing) && Sidekiq::Testing.respond_to?(:inline?) && Sidekiq::Testing.inline?
end

.perform(arguments) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/ruby_reactor/map/element_executor.rb', line 8

def self.perform(arguments)
  arguments = arguments.transform_keys(&:to_sym)

  # Per-element liveness lock (Phase 5b): its presence is the map sweeper's
  # "element alive" signal, and it serializes duplicate deliveries so a
  # re-run can't double-decrement the counter (M3). A duplicate of a live
  # element is dropped — the live original stores the result and finalizes.
  lock = acquire_element_lock(arguments)
  return if lock == :contended

  begin
    perform_element(arguments)
  ensure
    lock.release if lock.respond_to?(:release)
  end
end

.perform_element(arguments) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/ruby_reactor/map/element_executor.rb', line 49

def self.perform_element(arguments)
  context = hydrate_or_create_context(arguments)
  # The element already runs inside its own background worker, so any async
  # steps (and async retries) must execute inline here rather than handing
  # off to a detached Worker that would escape map result/counter tracking.
  # This mirrors SidekiqWorkers::Worker, which sets the same flag.
  context.inline_async_execution = true

  storage = RubyReactor.configuration.storage_adapter
  storage.store_map_element_context_id(arguments[:map_id], context.context_id,
                                       arguments[:parent_reactor_class_name])

  return if check_fail_fast?(arguments, storage)

  executor = Executor.new(context.reactor_class, {}, context)
  arguments[:serialized_context] ? executor.resume_execution : executor.execute

  result = executor.result

  # An async retry requeued this element as a fresh MapElementWorker job, so
  # it is not finished yet. Do not store a result, decrement the completion
  # counter, or trigger the next batch — the requeued job will do that when
  # the element ultimately succeeds or exhausts its retries.
  return if result.is_a?(RetryQueuedResult)

  handle_result(result, arguments, context, storage, executor)
  finalize_execution(arguments, storage)
end