Class: Phronomy::WorkflowRunner

Inherits:
Object
  • Object
show all
Includes:
Runnable
Defined in:
lib/phronomy/workflow_runner.rb

Overview

Execution engine for compiled workflows. Manages node execution, phase transitions, halt/resume, and wait states. Instantiated by Phronomy::Workflow and used internally.

== Design principle

State transitions are driven entirely by state_machines. The PhaseTracker holds a reference to the current WorkflowContext via +attr_accessor :context+, and guard lambdas evaluate +m.context+ (the WorkflowContext) rather than the PhaseTracker itself. This ensures that "what happens next" is always determined by the declared state machine topology, never by Phronomy internals.

== Three transition categories registered in PhaseTracker

  1. advance_ — automatic, unconditional after-transitions fired when an action state's action completes (declared with +after :foo, to: :bar+)

  2. route — a single event that carries all guarded transitions (declared with +event :route, from: :foo, guard: ..., to: :bar+) Guards are evaluated in declaration order; first match wins. An unguarded fallback, if declared, is evaluated last.

  3. — external events triggered by human input, originating from wait states (declared with +event :approve, from: :awaiting, to: :run+)

Constant Summary collapse

FINISH =

Sentinel value for the terminal state of a workflow.

:__end__

Instance Method Summary collapse

Methods included from Runnable

#batch, #trace

Constructor Details

#initialize(state_class:, nodes:, after_transitions:, route_transitions:, external_events:, entry_point:, wait_state_names: [], before_callbacks: {}, after_callbacks: {}, state_store: nil) ⇒ WorkflowRunner

Returns a new instance of WorkflowRunner.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/phronomy/workflow_runner.rb', line 39

def initialize(state_class:, nodes:, after_transitions:, route_transitions:,
  external_events:, entry_point:, wait_state_names: [],
  before_callbacks: {}, after_callbacks: {}, state_store: nil)
  @state_class = state_class
  @nodes = nodes
  @after_transitions = after_transitions  # { from => to }
  @route_transitions = route_transitions  # { from => [{guard:, to:}, ...] }
  @external_events = external_events    # { name => [{from:, to:, guard:}, ...] }
  @entry_point = entry_point
  @wait_state_names = wait_state_names
  @before_callbacks = before_callbacks.dup
  @after_callbacks = after_callbacks.dup
  @state_store_override = state_store
  @phase_machine_class = build_phase_machine_class
end

Instance Method Details

#invoke(input, config: {}) ⇒ Object

Executes the workflow from the initial state.

Parameters:

  • input (Hash)

    initial context field values

  • config (Hash) (defaults to: {})

    { thread_id:, recursion_limit:, user_id:, session_id: }

Returns:

  • (Object)

    final context (includes Phronomy::WorkflowContext)



59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/phronomy/workflow_runner.rb', line 59

def invoke(input, config: {})
  caller_meta = {}
  caller_meta[:user_id] = config[:user_id] if config[:user_id]
  caller_meta[:session_id] = config[:session_id] if config[:session_id]

  trace("graph.invoke", input: input.inspect, **caller_meta) do |_span|
    thread_id = config[:thread_id] || SecureRandom.uuid
    recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit)
    state = @state_class.new(**input)
    state.(thread_id: thread_id)
    result = run_graph(state, recursion_limit: recursion_limit)
    [result, nil]
  end
end

#resume(state:, input: nil) ⇒ Object

Generic resume. Equivalent to +send_event(state:, event: :resume, input:)+.

Parameters:

  • state (Object)

    halted context

  • input (Hash, nil) (defaults to: nil)

    optional field updates to merge before resuming

Returns:

  • (Object)

    final context



78
79
80
# File 'lib/phronomy/workflow_runner.rb', line 78

def resume(state:, input: nil)
  send_event(state: state, event: :resume, input: input)
end

#send_event(state:, event:, input: nil) ⇒ Object

Fires a named event to advance a halted workflow.

The special event +:resume+ selects the first external event registered for the current wait state and fires it.

Parameters:

  • state (Object)

    halted context

  • event (Symbol)

    named event or +:resume+ for generic resumption

  • input (Hash, nil) (defaults to: nil)

    optional field updates to merge before resuming

Returns:

  • (Object)

    final context



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/phronomy/workflow_runner.rb', line 91

def send_event(state:, event:, input: nil)
  state = state.merge(input) if input
  event = event.to_sym
  current_phase = state.phase

  tracker = new_phase_machine(current_phase)
  tracker.context = state

  ev_to_fire = if event == :resume
    # Find the first external event that can originate from the current wait state.
    name, = @external_events.find { |_, ts| ts.any? { |t| t[:from] == current_phase } }
    unless name
      raise ArgumentError,
        "No external event registered for wait state #{current_phase.inspect}"
    end
    name
  else
    unless @external_events.key?(event)
      raise ArgumentError,
        "Unknown event #{event.inspect}. Valid events: #{@external_events.keys.inspect}"
    end
    event
  end

  fire_event!(tracker, ev_to_fire, current_phase)

  next_phase = tracker.phase.to_sym
  next_node = (next_phase == :__end__) ? FINISH : next_phase
  run_graph(state, from_node: next_node)
end

#stream(input, config: {}) {|Hash| ... } ⇒ Object

Streaming execution. Yields { node: Symbol, state: Object } after each node completes.

Parameters:

  • input (Hash)
  • config (Hash) (defaults to: {})

Yields:

  • (Hash)

Returns:

  • (Object)

    final context



127
128
129
130
131
132
133
# File 'lib/phronomy/workflow_runner.rb', line 127

def stream(input, config: {}, &block)
  thread_id = config[:thread_id] || SecureRandom.uuid
  recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit)
  state = @state_class.new(**input)
  state.(thread_id: thread_id)
  run_graph(state, recursion_limit: recursion_limit, &block)
end