Class: Phronomy::Graph::ParallelNode

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/graph/parallel_node.rb

Overview

Represents a set of branches that execute concurrently as a single graph node.

Each branch is a callable (Proc, lambda, or any object responding to #call) that receives the current state and returns either a Hash of field updates or nil. All branches run in separate threads; their results are merged in registration order using the following policy:

:replace fields — last-write-wins (rightmost branch wins) :append fields — all Arrays are concatenated :merge fields — all Hashes are deep-merged (rightmost wins on conflict)

Timeout support Pass +timeout:+ (seconds, Numeric) to limit how long the node may run. If any thread does not finish within the limit, all threads are killed and +Phronomy::Graph::TimeoutError+ is raised (for +:raise+ policy) or recorded in the result hash under +:parallel_errors+ (for +:best_effort+ policy).

Failure policies (+on_error:+) :raise (default) Re-raises the first exception after all threads are joined. Mirrors the original Thread#value semantics. :best_effort Collects successful results and stores any errors in the update Hash under the key +:parallel_errors+ (Array of exception objects). The caller's state class should declare: field :parallel_errors, type: :append, default: -> { [] } Unknown keys are silently ignored by the State#merge machinery.

Examples:

Basic two-branch node

graph.add_parallel_node(:step,
  ->(state) { { field_a: "from_a" } },
  ->(state) { { field_b: "from_b" } }
)

With timeout and failure policy

graph.add_parallel_node(:step,
  branch_a, branch_b,
  timeout: 10,
  on_error: :best_effort
)

Instance Method Summary collapse

Constructor Details

#initialize(branches, timeout: nil, on_error: :raise) ⇒ ParallelNode

Returns a new instance of ParallelNode.

Parameters:

  • branches (Array<#call>)

    at least one callable branch required

  • timeout (Numeric, nil) (defaults to: nil)

    wall-clock limit in seconds; nil = unlimited

  • on_error (Symbol) (defaults to: :raise)

    :raise (default) or :best_effort

Raises:

  • (ArgumentError)


49
50
51
52
53
54
55
56
57
58
# File 'lib/phronomy/graph/parallel_node.rb', line 49

def initialize(branches, timeout: nil, on_error: :raise)
  raise ArgumentError, "branches must be a non-empty Array" if branches.empty?
  unless %i[raise best_effort].include?(on_error)
    raise ArgumentError, "on_error must be :raise or :best_effort, got #{on_error.inspect}"
  end

  @branches = branches
  @timeout = timeout
  @on_error = on_error
end

Instance Method Details

#call(state) ⇒ Hash?

Executes all branches concurrently and merges their results.

Parameters:

  • state (Object)

    state object (includes Phronomy::Graph::State)

Returns:

  • (Hash, nil)

    merged update hash, or nil when all branches return nil



64
65
66
67
68
69
70
71
72
73
# File 'lib/phronomy/graph/parallel_node.rb', line 64

def call(state)
  threads = @branches.map { |branch| Thread.new { branch.call(state) } }
  deadline = @timeout ? (Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout) : nil

  if @on_error == :best_effort
    gather_best_effort(threads, deadline)
  else
    gather_raise(threads, deadline)
  end
end