Class: RubyReactor::Map::Collector

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Defined in:
lib/ruby_reactor/map/collector.rb

Class Method Summary collapse

Methods included from Helpers

apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution

Class Method Details

.acquire_collect_lock(map_id) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/ruby_reactor/map/collector.rb', line 27

def self.acquire_collect_lock(map_id)
  return :inline if inline_testing_mode?

  lock = RubyReactor::Lock.new(
    "map_collect:#{map_id}",
    owner: SecureRandom.uuid, ttl: RubyReactor.configuration.context_lock_ttl,
    wait: 0, auto_extend: true
  )
  lock.acquire
  lock
rescue RubyReactor::Lock::AcquisitionError
  :contended
end

.apply_collect_block(results, step_config) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/ruby_reactor/map/collector.rb', line 108

def self.apply_collect_block(results, step_config)
  collect_block = step_config.arguments[:collect_block][:source].value if step_config.arguments[:collect_block]
  # TODO: Check allow_partial_failure option

  if collect_block
    begin
      # Pass Enumerator to collect block
      collected = collect_block.call(results)
      RubyReactor::Success(collected)
    rescue StandardError => e
      RubyReactor.configuration.logger.error("Map collect block raised: #{e.message}")
      RubyReactor.configuration.logger.error(e.backtrace.join("\n")) if e.backtrace
      RubyReactor::Failure(e)
    end
  else
    # Default behavior: Return Success(Enumerator).
    # Logic for checking failures is deferred to the consumer of the enumerator.
    RubyReactor::Success(results)
  end
end

.handle_failure(failed_context_id, metadata, storage, parent_context, step_name) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/ruby_reactor/map/collector.rb', line 129

def self.handle_failure(failed_context_id, , storage, parent_context, step_name)
  # Resolve the class of the mapped reactor to retrieve its context
  reactor_class = resolve_reactor_class(["reactor_class_info"])
  failed_context_data = storage.retrieve_context(failed_context_id, reactor_class.name)

  return unless failed_context_data

  failed_context = RubyReactor::Context.deserialize_from_retry(failed_context_data)
  reason = failed_context.failure_reason
  result = reason.is_a?(RubyReactor::Failure) ? reason : RubyReactor::Failure(reason)
  resume_parent_execution(parent_context, step_name, result, storage)
end

.inline_testing_mode?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/ruby_reactor/map/collector.rb', line 41

def self.inline_testing_mode?
  defined?(Sidekiq::Testing) && Sidekiq::Testing.respond_to?(:inline?) && Sidekiq::Testing.inline?
end

.perform(arguments) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/ruby_reactor/map/collector.rb', line 8

def self.perform(arguments)
  arguments = arguments.transform_keys(&:to_sym)
  map_id = arguments[:map_id]

  # Serialize concurrent collector deliveries for the SAME map (eager queue +
  # counter-zero trigger + sweeper re-trigger could otherwise all resume the
  # parent at once and both write its context). A dedicated map_collect lock
  # is used rather than the parent's own lock so it never conflicts with the
  # context lock the parent's resume_execution acquires for itself.
  lock = acquire_collect_lock(map_id)
  return if lock == :contended

  begin
    perform_collection(arguments)
  ensure
    lock.release if lock.respond_to?(:release)
  end
end

.perform_collection(arguments) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/ruby_reactor/map/collector.rb', line 45

def self.perform_collection(arguments)
  map_id = arguments[:map_id]
  parent_context_id = arguments[:parent_context_id]
  parent_reactor_class_name = arguments[:parent_reactor_class_name]
  step_name = arguments[:step_name]
  strict_ordering = arguments[:strict_ordering]
  # timeout = arguments[:timeout]

  storage = RubyReactor.configuration.storage_adapter
  parent_context_data = storage.retrieve_context(parent_context_id, parent_reactor_class_name)
  parent_context = RubyReactor::Context.deserialize_from_retry(parent_context_data)

  # Idempotency: if the parent already recorded this map step's result, a
  # prior collector already resumed it. Re-resuming would double-execute the
  # steps after the map. Skip.
  return if parent_context.intermediate_results.key?(step_name.to_sym)

  # Check if all tasks are completed
   = storage.(map_id, parent_reactor_class_name)
  total_count =  ? ["count"].to_i : 0

  results_count = storage.count_map_results(map_id, parent_reactor_class_name)

  # Not done yet, requeue or wait?
  # Actually Collector currently assumes we only call it when we expect completion or check progress
  # Since map_offset tracks dispatching progress and might exceed count due to batching reservation,
  # we must strictly check against the total count of elements.
  # Check for fail_fast failure FIRST
  if (failed_context_id = storage.retrieve_map_failed_context_id(map_id, parent_reactor_class_name))
    handle_failure(failed_context_id, , storage, parent_context, step_name)
    return
  end

  return if results_count < total_count

  # Retrieve results lazily
  results = RubyReactor::Map::ResultEnumerator.new(
    map_id,
    parent_reactor_class_name,
    strict_ordering: strict_ordering
  )

  # Apply collect block (or default collection)
  step_config = parent_context.reactor_class.steps[step_name.to_sym]

  begin
    final_result = apply_collect_block(results, step_config)

    if final_result.failure?
      # Optionally log failure internally or just rely on context status update
    end
  rescue StandardError => e
    final_result = RubyReactor::Failure(e)
  end

  # Resume parent execution
  resume_parent_execution(parent_context, step_name, final_result, storage)
rescue StandardError => e
  RubyReactor.configuration.logger.error("Map collector crashed: #{e.message}")
  RubyReactor.configuration.logger.error(e.backtrace.join("\n")) if e.backtrace
  raise e
end