Class: Phronomy::WorkflowRunner

Inherits:
Object
  • Object
show all
Includes:
Runnable
Defined in:
lib/phronomy/workflow_runner.rb

Overview

Execution engine for compiled workflows. Manages node execution, phase transitions, halt/resume, and wait states. Instantiated by Phronomy::Workflow and used internally.

Wait states (registered via wait_states:) are virtual nodes that automatically halt execution when reached. They can be resumed with either #resume (generic) or #send_event (event-typed).

Internally, a state_machines-based PhaseTracker class is generated at initialization time. The tracker validates phase transitions during execution; invalid transitions are logged as warnings without halting.

Constant Summary collapse

FINISH =

Sentinel value for the terminal state of a workflow.

:__end__

Instance Method Summary collapse

Methods included from Runnable

#batch, #trace

Constructor Details

#initialize(state_class:, nodes:, edges:, conditional_edges:, entry_point:, before_callbacks: {}, after_callbacks: {}, wait_states: {}, state_store: nil) ⇒ WorkflowRunner

Returns a new instance of WorkflowRunner.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/phronomy/workflow_runner.rb', line 24

def initialize(state_class:, nodes:, edges:, conditional_edges:, entry_point:,
  before_callbacks: {}, after_callbacks: {}, wait_states: {}, state_store: nil)
  @state_class = state_class
  @nodes = nodes
  @edges = edges
  @conditional_edges = conditional_edges
  @entry_point = entry_point
  @before_callbacks = before_callbacks.dup
  @after_callbacks = after_callbacks.dup
  # { wait_state_name => { resume_event: Symbol, resume_to: Symbol } }
  @wait_states = wait_states.dup
  @state_store_override = state_store
  @phase_machine_class = build_phase_machine_class
end

Instance Method Details

#invoke(input, config: {}) ⇒ Object

Executes the workflow from the entry point.

Parameters:

  • input (Hash)

    initial context field values

  • config (Hash) (defaults to: {})

    { thread_id:, recursion_limit:, user_id:, session_id: }

Returns:

  • (Object)

    final context (includes Phronomy::WorkflowContext)



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/phronomy/workflow_runner.rb', line 43

def invoke(input, config: {})
  caller_meta = {}
  caller_meta[:user_id] = config[:user_id] if config[:user_id]
  caller_meta[:session_id] = config[:session_id] if config[:session_id]

  trace("graph.invoke", input: input.inspect, **caller_meta) do |_span|
    thread_id = config[:thread_id] || SecureRandom.uuid
    recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit)
    state = @state_class.new(**input)
    state.(thread_id: thread_id)
    result = run_graph(state, recursion_limit: recursion_limit)
    [result, nil]
  end
end

#resume(state:, input: nil) ⇒ Object

Generic resume. Routes based on the current phase encoding. Equivalent to +send_event(state:, event: :resume, input:)+.

Parameters:

  • state (Object)

    halted context

  • input (Hash, nil) (defaults to: nil)

    optional field updates to merge before resuming

Returns:

  • (Object)

    final context



64
65
66
# File 'lib/phronomy/workflow_runner.rb', line 64

def resume(state:, input: nil)
  send_event(state: state, event: :resume, input: input)
end

#send_event(state:, event:, input: nil) ⇒ Object

Fires a named event to advance a halted workflow.

The special event +:resume+ is accepted for all halt types:

  • Named wait state → resumes at +resume_to+ node

Any other event name must match the +resume_event:+ declared in the wait_states configuration.

Parameters:

  • state (Object)

    halted context

  • event (Symbol)

    +:resume+ for generic resumption, or a named event

  • input (Hash, nil) (defaults to: nil)

    optional field updates to merge before resuming

Returns:

  • (Object)

    final context



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/phronomy/workflow_runner.rb', line 80

def send_event(state:, event:, input: nil)
  state = state.merge(input) if input
  event = event.to_sym
  current_phase = state.phase

  if event == :resume
    # Named wait state: use resume_to
    if @wait_states.key?(current_phase)
      return run_graph(state, from_node: @wait_states[current_phase][:resume_to])
    end
    raise ArgumentError, "State has no wait state registered for phase #{current_phase.inspect}"
  end

  # Named event lookup
  _, wait_cfg = @wait_states.find { |_, c| c[:resume_event] == event }
  unless wait_cfg
    valid = @wait_states.values.filter_map { |c| c[:resume_event] }.uniq
    raise ArgumentError, "Unknown event #{event.inspect}. Valid events: #{valid.inspect}"
  end
  run_graph(state, from_node: wait_cfg[:resume_to])
end

#stream(input, config: {}) {|Hash| ... } ⇒ Object

Streaming execution. Yields { node: Symbol, state: Object } after each node completes.

Parameters:

  • input (Hash)
  • config (Hash) (defaults to: {})

Yields:

  • (Hash)

Returns:

  • (Object)

    final context



107
108
109
110
111
112
113
# File 'lib/phronomy/workflow_runner.rb', line 107

def stream(input, config: {}, &block)
  thread_id = config[:thread_id] || SecureRandom.uuid
  recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit)
  state = @state_class.new(**input)
  state.(thread_id: thread_id)
  run_graph(state, recursion_limit: recursion_limit, &block)
end