Class: Phronomy::Graph::ParallelNode
- Inherits:
-
Object
- Object
- Phronomy::Graph::ParallelNode
- Defined in:
- lib/phronomy/graph/parallel_node.rb
Overview
Represents a set of branches that execute concurrently as a single graph node.
Each branch is a callable (Proc, lambda, or any object responding to #call) that receives the current state and returns either a Hash of field updates or nil. All branches run in separate threads; their results are merged in registration order using the following policy:
:replace fields — last-write-wins (rightmost branch wins) :append fields — all Arrays are concatenated :merge fields — all Hashes are deep-merged (rightmost wins on conflict)
Timeout support Pass +timeout:+ (seconds, Numeric) to limit how long the node may run. If any thread does not finish within the limit, all threads are killed and +Phronomy::Graph::TimeoutError+ is raised (for +:raise+ policy) or recorded in the result hash under +:parallel_errors+ (for +:best_effort+ policy).
Failure policies (+on_error:+) :raise (default) Re-raises the first exception after all threads are joined. Mirrors the original Thread#value semantics. :best_effort Collects successful results and stores any errors in the update Hash under the key +:parallel_errors+ (Array of exception objects). The caller's state class should declare: field :parallel_errors, type: :append, default: -> { [] } Unknown keys are silently ignored by the State#merge machinery.
Instance Method Summary collapse
-
#call(state) ⇒ Hash?
Executes all branches concurrently and merges their results.
-
#initialize(branches, timeout: nil, on_error: :raise) ⇒ ParallelNode
constructor
A new instance of ParallelNode.
Constructor Details
#initialize(branches, timeout: nil, on_error: :raise) ⇒ ParallelNode
Returns a new instance of ParallelNode.
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/phronomy/graph/parallel_node.rb', line 49 def initialize(branches, timeout: nil, on_error: :raise) raise ArgumentError, "branches must be a non-empty Array" if branches.empty? unless %i[raise best_effort].include?(on_error) raise ArgumentError, "on_error must be :raise or :best_effort, got #{on_error.inspect}" end @branches = branches @timeout = timeout @on_error = on_error end |
Instance Method Details
#call(state) ⇒ Hash?
Executes all branches concurrently and merges their results.
64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/phronomy/graph/parallel_node.rb', line 64 def call(state) threads = @branches.map { |branch| Thread.new { branch.call(state) } } deadline = @timeout ? (Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout) : nil state_class = state.class if @on_error == :best_effort gather_best_effort(threads, deadline, state_class) else gather_raise(threads, deadline, state_class) end end |