Class: RubyReactor::Map::ElementExecutor
- Inherits:
-
Object
- Object
- RubyReactor::Map::ElementExecutor
- Extended by:
- Helpers
- Defined in:
- lib/ruby_reactor/map/element_executor.rb
Class Method Summary collapse
-
.check_fail_fast?(arguments, storage) ⇒ Boolean
rubocop:enable Style/IdenticalConditionalBranches.
- .finalize_execution(arguments, storage) ⇒ Object
- .handle_result(result, arguments, context, storage, executor) ⇒ Object
-
.hydrate_or_create_context(arguments) ⇒ Object
rubocop:disable Style/IdenticalConditionalBranches.
- .perform(arguments) ⇒ Object
Methods included from Helpers
apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution
Class Method Details
.check_fail_fast?(arguments, storage) ⇒ Boolean
rubocop:enable Style/IdenticalConditionalBranches
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 70 def self.check_fail_fast?(arguments, storage) return false unless arguments[:fail_fast] map_id = arguments[:map_id] parent_reactor_class_name = arguments[:parent_reactor_class_name] failed_context_id = storage.retrieve_map_failed_context_id(map_id, parent_reactor_class_name) return false unless failed_context_id # Skip execution finalize_execution(arguments, storage) true end |
.finalize_execution(arguments, storage) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 114 def self.finalize_execution(arguments, storage) map_id = arguments[:map_id] parent_class = arguments[:parent_reactor_class_name] new_count = storage.decrement_map_counter(map_id, parent_class) trigger_next_batch_if_needed(arguments, arguments[:index], arguments[:batch_size]) return unless new_count.zero? RubyReactor.configuration.async_router.perform_map_collection_async( parent_context_id: arguments[:parent_context_id], map_id: map_id, parent_reactor_class_name: parent_class, step_name: arguments[:step_name], strict_ordering: arguments[:strict_ordering], timeout: 3600 ) end |
.handle_result(result, arguments, context, storage, executor) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 84 def self.handle_result(result, arguments, context, storage, executor) return if result.is_a?(RetryQueuedResult) map_id = arguments[:map_id] index = arguments[:index] parent_class = arguments[:parent_reactor_class_name] # Using short name for variable if result.success? storage.store_map_result(map_id, index, ContextSerializer.serialize_value(result.value), parent_class, strict_ordering: arguments[:strict_ordering]) else executor.undo_all storage.store_map_result(map_id, index, { _error: result.error }, parent_class, strict_ordering: arguments[:strict_ordering]) if arguments[:fail_fast] storage.store_map_failed_context_id(map_id, context.context_id, parent_class) # FAST FAIL: Trigger Collector immediately to cancel/fail the map execution RubyReactor.configuration.async_router.perform_map_collection_async( parent_context_id: arguments[:parent_context_id], map_id: map_id, parent_reactor_class_name: parent_class, step_name: arguments[:step_name], strict_ordering: arguments[:strict_ordering], timeout: 3600 ) end end end |
.hydrate_or_create_context(arguments) ⇒ Object
rubocop:disable Style/IdenticalConditionalBranches
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 49 def self.hydrate_or_create_context(arguments) if arguments[:serialized_context] context = ContextSerializer.deserialize(arguments[:serialized_context]) context. = arguments if context.inputs.empty? && arguments[:serialized_inputs] context.inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs]) end context else inputs = ContextSerializer.deserialize_value(arguments[:serialized_inputs]) reactor_class = resolve_reactor_class(arguments[:reactor_class_info]) context = Context.new(inputs, reactor_class) context.parent_context_id = arguments[:parent_context_id] context. = arguments context end end |
.perform(arguments) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 8 def self.perform(arguments) arguments = arguments.transform_keys(&:to_sym) context = hydrate_or_create_context(arguments) storage = RubyReactor.configuration.storage_adapter storage.store_map_element_context_id(arguments[:map_id], context.context_id, arguments[:parent_reactor_class_name]) return if check_fail_fast?(arguments, storage) executor = Executor.new(context.reactor_class, {}, context) arguments[:serialized_context] ? executor.resume_execution : executor.execute handle_result(executor.result, arguments, context, storage, executor) finalize_execution(arguments, storage) end |