Class: Phronomy::WorkflowRunner
- Inherits:
-
Object
- Object
- Phronomy::WorkflowRunner
- Includes:
- Runnable
- Defined in:
- lib/phronomy/workflow_runner.rb
Overview
Execution engine for compiled workflows. Manages node execution, phase transitions, halt/resume, and wait states. Instantiated by Phronomy::Workflow and used internally.
== Design principle
State transitions are driven entirely by state_machines. The PhaseTracker holds a reference to the current WorkflowContext via +attr_accessor :context+, and guard lambdas evaluate +m.context+ (the WorkflowContext) rather than the PhaseTracker itself. This ensures that "what happens next" is always determined by the declared state machine topology, never by Phronomy internals.
== Three transition categories registered in PhaseTracker
advance_
— automatic, unconditional after-transitions fired when an action state's action completes (declared with +after :foo, to: :bar+) route — a single event that carries all guarded transitions (declared with +event :route, from: :foo, guard: ..., to: :bar+) Guards are evaluated in declaration order; first match wins. An unguarded fallback, if declared, is evaluated last.
— external events triggered by human input, originating from wait states (declared with +event :approve, from: :awaiting, to: :run+)
Constant Summary collapse
- FINISH =
Sentinel value for the terminal state of a workflow.
:__end__
Instance Method Summary collapse
-
#initialize(state_class:, nodes:, after_transitions:, route_transitions:, external_events:, entry_point:, wait_state_names: [], before_callbacks: {}, after_callbacks: {}, state_store: nil) ⇒ WorkflowRunner
constructor
A new instance of WorkflowRunner.
-
#invoke(input, config: {}) ⇒ Object
Executes the workflow from the initial state.
-
#resume(state:, input: nil) ⇒ Object
Generic resume.
-
#send_event(state:, event:, input: nil) ⇒ Object
Fires a named event to advance a halted workflow.
-
#stream(input, config: {}) {|Hash| ... } ⇒ Object
Streaming execution.
Methods included from Runnable
Constructor Details
#initialize(state_class:, nodes:, after_transitions:, route_transitions:, external_events:, entry_point:, wait_state_names: [], before_callbacks: {}, after_callbacks: {}, state_store: nil) ⇒ WorkflowRunner
Returns a new instance of WorkflowRunner.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/phronomy/workflow_runner.rb', line 39 def initialize(state_class:, nodes:, after_transitions:, route_transitions:, external_events:, entry_point:, wait_state_names: [], before_callbacks: {}, after_callbacks: {}, state_store: nil) @state_class = state_class @nodes = nodes @after_transitions = after_transitions # { from => to } @route_transitions = route_transitions # { from => [{guard:, to:}, ...] } @external_events = external_events # { name => [{from:, to:, guard:}, ...] } @entry_point = entry_point @wait_state_names = wait_state_names @before_callbacks = before_callbacks.dup @after_callbacks = after_callbacks.dup @state_store_override = state_store @phase_machine_class = build_phase_machine_class end |
Instance Method Details
#invoke(input, config: {}) ⇒ Object
Executes the workflow from the initial state.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/phronomy/workflow_runner.rb', line 59 def invoke(input, config: {}) = {} [:user_id] = config[:user_id] if config[:user_id] [:session_id] = config[:session_id] if config[:session_id] trace("graph.invoke", input: input.inspect, **) do |_span| thread_id = config[:thread_id] || SecureRandom.uuid recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit) state = @state_class.new(**input) state.(thread_id: thread_id) result = run_graph(state, recursion_limit: recursion_limit) [result, nil] end end |
#resume(state:, input: nil) ⇒ Object
Generic resume. Equivalent to +send_event(state:, event: :resume, input:)+.
78 79 80 |
# File 'lib/phronomy/workflow_runner.rb', line 78 def resume(state:, input: nil) send_event(state: state, event: :resume, input: input) end |
#send_event(state:, event:, input: nil) ⇒ Object
Fires a named event to advance a halted workflow.
The special event +:resume+ selects the first external event registered for the current wait state and fires it.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/phronomy/workflow_runner.rb', line 91 def send_event(state:, event:, input: nil) state = state.merge(input) if input event = event.to_sym current_phase = state.phase tracker = new_phase_machine(current_phase) tracker.context = state ev_to_fire = if event == :resume # Find the first external event that can originate from the current wait state. name, = @external_events.find { |_, ts| ts.any? { |t| t[:from] == current_phase } } unless name raise ArgumentError, "No external event registered for wait state #{current_phase.inspect}" end name else unless @external_events.key?(event) raise ArgumentError, "Unknown event #{event.inspect}. Valid events: #{@external_events.keys.inspect}" end event end fire_event!(tracker, ev_to_fire, current_phase) next_phase = tracker.phase.to_sym next_node = (next_phase == :__end__) ? FINISH : next_phase run_graph(state, from_node: next_node) end |
#stream(input, config: {}) {|Hash| ... } ⇒ Object
Streaming execution. Yields { node: Symbol, state: Object } after each node completes.
127 128 129 130 131 132 133 |
# File 'lib/phronomy/workflow_runner.rb', line 127 def stream(input, config: {}, &block) thread_id = config[:thread_id] || SecureRandom.uuid recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit) state = @state_class.new(**input) state.(thread_id: thread_id) run_graph(state, recursion_limit: recursion_limit, &block) end |