Class: Phronomy::WorkflowRunner
- Inherits:
-
Object
- Object
- Phronomy::WorkflowRunner
- Includes:
- Runnable
- Defined in:
- lib/phronomy/workflow_runner.rb
Overview
Execution engine for compiled workflows. Manages node execution, phase transitions, halt/resume, and wait states. Instantiated by Phronomy::Workflow and used internally.
Wait states (registered via wait_states:) are virtual nodes that automatically halt execution when reached. They can be resumed with either #resume (generic) or #send_event (event-typed).
Internally, a state_machines-based PhaseTracker class is generated at initialization time. The tracker validates phase transitions during execution; invalid transitions are logged as warnings without halting.
Constant Summary collapse
- FINISH =
Sentinel value for the terminal state of a workflow.
:__end__
Instance Method Summary collapse
-
#initialize(state_class:, nodes:, edges:, conditional_edges:, entry_point:, before_callbacks: {}, after_callbacks: {}, wait_states: {}, state_store: nil) ⇒ WorkflowRunner
constructor
A new instance of WorkflowRunner.
-
#invoke(input, config: {}) ⇒ Object
Executes the workflow from the entry point.
-
#resume(state:, input: nil) ⇒ Object
Generic resume.
-
#send_event(state:, event:, input: nil) ⇒ Object
Fires a named event to advance a halted workflow.
-
#stream(input, config: {}) {|Hash| ... } ⇒ Object
Streaming execution.
Methods included from Runnable
Constructor Details
#initialize(state_class:, nodes:, edges:, conditional_edges:, entry_point:, before_callbacks: {}, after_callbacks: {}, wait_states: {}, state_store: nil) ⇒ WorkflowRunner
Returns a new instance of WorkflowRunner.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/phronomy/workflow_runner.rb', line 24 def initialize(state_class:, nodes:, edges:, conditional_edges:, entry_point:, before_callbacks: {}, after_callbacks: {}, wait_states: {}, state_store: nil) @state_class = state_class @nodes = nodes @edges = edges @conditional_edges = conditional_edges @entry_point = entry_point @before_callbacks = before_callbacks.dup @after_callbacks = after_callbacks.dup # { wait_state_name => { resume_event: Symbol, resume_to: Symbol } } @wait_states = wait_states.dup @state_store_override = state_store @phase_machine_class = build_phase_machine_class end |
Instance Method Details
#invoke(input, config: {}) ⇒ Object
Executes the workflow from the entry point.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/phronomy/workflow_runner.rb', line 43 def invoke(input, config: {}) = {} [:user_id] = config[:user_id] if config[:user_id] [:session_id] = config[:session_id] if config[:session_id] trace("graph.invoke", input: input.inspect, **) do |_span| thread_id = config[:thread_id] || SecureRandom.uuid recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit) state = @state_class.new(**input) state.(thread_id: thread_id) result = run_graph(state, recursion_limit: recursion_limit) [result, nil] end end |
#resume(state:, input: nil) ⇒ Object
Generic resume. Routes based on the current phase encoding. Equivalent to +send_event(state:, event: :resume, input:)+.
64 65 66 |
# File 'lib/phronomy/workflow_runner.rb', line 64 def resume(state:, input: nil) send_event(state: state, event: :resume, input: input) end |
#send_event(state:, event:, input: nil) ⇒ Object
Fires a named event to advance a halted workflow.
The special event +:resume+ is accepted for all halt types:
- Named wait state → resumes at +resume_to+ node
Any other event name must match the +resume_event:+ declared in the wait_states configuration.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/phronomy/workflow_runner.rb', line 80 def send_event(state:, event:, input: nil) state = state.merge(input) if input event = event.to_sym current_phase = state.phase if event == :resume # Named wait state: use resume_to if @wait_states.key?(current_phase) return run_graph(state, from_node: @wait_states[current_phase][:resume_to]) end raise ArgumentError, "State has no wait state registered for phase #{current_phase.inspect}" end # Named event lookup _, wait_cfg = @wait_states.find { |_, c| c[:resume_event] == event } unless wait_cfg valid = @wait_states.values.filter_map { |c| c[:resume_event] }.uniq raise ArgumentError, "Unknown event #{event.inspect}. Valid events: #{valid.inspect}" end run_graph(state, from_node: wait_cfg[:resume_to]) end |
#stream(input, config: {}) {|Hash| ... } ⇒ Object
Streaming execution. Yields { node: Symbol, state: Object } after each node completes.
107 108 109 110 111 112 113 |
# File 'lib/phronomy/workflow_runner.rb', line 107 def stream(input, config: {}, &block) thread_id = config[:thread_id] || SecureRandom.uuid recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit) state = @state_class.new(**input) state.(thread_id: thread_id) run_graph(state, recursion_limit: recursion_limit, &block) end |