Class: RubyReactor::Map::Collector
- Inherits:
-
Object
- Object
- RubyReactor::Map::Collector
- Extended by:
- Helpers
- Defined in:
- lib/ruby_reactor/map/collector.rb
Class Method Summary collapse
- .apply_collect_block(results, step_config) ⇒ Object
- .handle_failure(failed_context_id, metadata, storage, parent_context, step_name) ⇒ Object
- .perform(arguments) ⇒ Object
Methods included from Helpers
apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution
Class Method Details
.apply_collect_block(results, step_config) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/ruby_reactor/map/collector.rb', line 67 def self.apply_collect_block(results, step_config) collect_block = step_config.arguments[:collect_block][:source].value if step_config.arguments[:collect_block] # TODO: Check allow_partial_failure option if collect_block begin # Pass Enumerator to collect block collected = collect_block.call(results) RubyReactor::Success(collected) rescue StandardError => e puts "COLLECTOR INNER EXCEPTION: #{e.}" puts e.backtrace RubyReactor::Failure(e) end else # Default behavior: Return Success(Enumerator). # Logic for checking failures is deferred to the consumer of the enumerator. RubyReactor::Success(results) end end |
.handle_failure(failed_context_id, metadata, storage, parent_context, step_name) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/ruby_reactor/map/collector.rb', line 88 def self.handle_failure(failed_context_id, , storage, parent_context, step_name) # Resolve the class of the mapped reactor to retrieve its context reactor_class = resolve_reactor_class(["reactor_class_info"]) failed_context_data = storage.retrieve_context(failed_context_id, reactor_class.name) return unless failed_context_data failed_context = RubyReactor::Context.deserialize_from_retry(failed_context_data) reason = failed_context.failure_reason result = reason.is_a?(RubyReactor::Failure) ? reason : RubyReactor::Failure(reason) resume_parent_execution(parent_context, step_name, result, storage) end |
.perform(arguments) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/ruby_reactor/map/collector.rb', line 8 def self.perform(arguments) arguments = arguments.transform_keys(&:to_sym) map_id = arguments[:map_id] parent_context_id = arguments[:parent_context_id] parent_reactor_class_name = arguments[:parent_reactor_class_name] step_name = arguments[:step_name] strict_ordering = arguments[:strict_ordering] # timeout = arguments[:timeout] storage = RubyReactor.configuration.storage_adapter parent_context_data = storage.retrieve_context(parent_context_id, parent_reactor_class_name) parent_context = RubyReactor::Context.deserialize_from_retry(parent_context_data) # Check if all tasks are completed = storage.(map_id, parent_reactor_class_name) total_count = ? ["count"].to_i : 0 results_count = storage.count_map_results(map_id, parent_reactor_class_name) # Not done yet, requeue or wait? # Actually Collector currently assumes we only call it when we expect completion or check progress # Since map_offset tracks dispatching progress and might exceed count due to batching reservation, # we must strictly check against the total count of elements. # Check for fail_fast failure FIRST if (failed_context_id = storage.retrieve_map_failed_context_id(map_id, parent_reactor_class_name)) handle_failure(failed_context_id, , storage, parent_context, step_name) return end return if results_count < total_count # Retrieve results lazily results = RubyReactor::Map::ResultEnumerator.new( map_id, parent_reactor_class_name, strict_ordering: strict_ordering ) # Apply collect block (or default collection) step_config = parent_context.reactor_class.steps[step_name.to_sym] begin final_result = apply_collect_block(results, step_config) if final_result.failure? # Optionally log failure internally or just rely on context status update end rescue StandardError => e final_result = RubyReactor::Failure(e) end # Resume parent execution resume_parent_execution(parent_context, step_name, final_result, storage) rescue StandardError => e puts "COLLECTOR CRASH: #{e.}" puts e.backtrace raise e end |