Class: Phronomy::WorkflowRunner Private

Inherits:
Object
  • Object
show all
Includes:
Runnable
Defined in:
lib/phronomy/workflow_runner.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Execution engine for compiled workflows. Manages state entry/exit action execution, phase transitions, halt/resume, and wait states. Instantiated by Phronomy::Workflow and used internally.

== Design principle

State transitions are driven entirely by state_machines. The PhaseTracker holds a reference to the current WorkflowContext via +attr_accessor :context+, and guard lambdas evaluate +m.context+ (the WorkflowContext) rather than the PhaseTracker itself. This ensures that "what happens next" is always determined by the declared state machine topology, never by Phronomy internals.

Entry and exit actions are registered as state_machines +after_transition to:+ and +before_transition from:+ callbacks respectively. Entry actions may either mutate the context in place or return a new context (e.g. via +s.merge(...)+). When an entry action returns a Phronomy::WorkflowContext, that value replaces the current context; otherwise the return value is ignored. Exit actions are always mutation-in-place; their return value is ignored.

The sole exception is the initial state: state_machines does not fire transition callbacks on initialization, so the entry action for the entry point is invoked directly by WorkflowRunner before the main execution loop begins.

== Two transition categories registered in PhaseTracker

  1. state_completed — all auto-fire transitions (with or without guards). Fired when an action state's action completes. Guards are evaluated in declaration order; first match wins. (declared with +transition from: :foo, to: :bar+ or +transition from: :foo, guard: ..., to: :bar+)

  2. — external events triggered by human input, originating from wait states (declared with +transition from: :awaiting, on: :approve, to: :run+)

Constant Summary collapse

FINISH =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Sentinel value for the terminal state of a workflow.

:__end__

Instance Method Summary collapse

Methods included from Runnable

#batch, #trace

Constructor Details

#initialize(state_class:, entry_actions:, declared_states:, auto_transitions:, external_events:, entry_point:, exit_actions: {}, wait_state_names: [], state_store: nil) ⇒ WorkflowRunner

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of WorkflowRunner.



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/phronomy/workflow_runner.rb', line 48

def initialize(state_class:, entry_actions:, declared_states:, auto_transitions:, external_events:, entry_point:, exit_actions: {}, wait_state_names: [], state_store: nil)
  @state_class = state_class
  @entry_actions = entry_actions   # { state_name => [callable, ...] }
  @declared_states = declared_states
  # Lookup set: states with at least one auto-fire transition declared.
  @auto_state_set = auto_transitions.each_with_object({}) { |t, h| h[t[:from]] = true }
  @external_events = external_events    # { name => [{from:, to:, guard:}, ...] }
  @entry_point = entry_point
  @wait_state_names = wait_state_names
  @state_store = state_store
  @phase_machine_class = build_phase_machine_class(auto_transitions, exit_actions)
end

Instance Method Details

#invoke(input, config: {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Executes the workflow from the initial state.

Parameters:

  • input (Hash)

    initial context field values

  • config (Hash) (defaults to: {})

    { thread_id:, recursion_limit:, user_id:, session_id:, state_store: }

Returns:

  • (Object)

    final context (includes Phronomy::WorkflowContext)



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/phronomy/workflow_runner.rb', line 66

def invoke(input, config: {})
  caller_meta = {}
  caller_meta[:user_id] = config[:user_id] if config[:user_id]
  caller_meta[:session_id] = config[:session_id] if config[:session_id]

  trace("workflow.invoke", input: input.inspect, **caller_meta) do |_span|
    thread_id = config[:thread_id] || SecureRandom.uuid
    recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit)

    store = config.fetch(:state_store, @state_store) || Phronomy.configuration.state_store
    snapshot = (store && config[:thread_id]) ? store.load(thread_id) : nil
    initial_fields = if snapshot && snapshot[:fields]
      snapshot[:fields].transform_keys(&:to_sym).merge(input.transform_keys(&:to_sym))
    else
      input
    end

    state = @state_class.new(**initial_fields)
    state.(thread_id: thread_id)
    result = if Phronomy.configuration.event_loop
      run_via_event_loop(state, recursion_limit: recursion_limit)
    else
      run_workflow(state, recursion_limit: recursion_limit)
    end
    store&.save(thread_id, {fields: result.to_h, phase: result.phase.to_s}) if config[:thread_id]
    [result, nil]
  end
end

#resume(state:, input: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Generic resume. Equivalent to +send_event(state:, event: :resume, input:)+.

Parameters:

  • state (Object)

    halted context

  • input (Hash, nil) (defaults to: nil)

    optional field updates to merge before resuming

Returns:

  • (Object)

    final context



100
101
102
# File 'lib/phronomy/workflow_runner.rb', line 100

def resume(state:, input: nil)
  send_event(state: state, event: :resume, input: input)
end

#send_event(state:, event:, input: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Fires a named event to advance a halted workflow.

The special event +:resume+ selects the first external event registered for the current wait state and fires it.

Parameters:

  • state (Object)

    halted context

  • event (Symbol)

    named event or +:resume+ for generic resumption

  • input (Hash, nil) (defaults to: nil)

    optional field updates to merge before resuming

Returns:

  • (Object)

    final context



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/phronomy/workflow_runner.rb', line 114

def send_event(state:, event:, input: nil)
  state = state.merge(input) if input
  event = event.to_sym
  current_phase = state.phase

  ev_to_fire = if event == :resume
    # Find the first external event that can originate from the current wait state.
    name, = @external_events.find { |_, ts| ts.any? { |t| t[:from] == current_phase } }
    unless name
      raise ArgumentError,
        "No external event registered for wait state #{current_phase.inspect}"
    end
    name
  else
    unless @external_events.key?(event)
      raise ArgumentError,
        "Unknown event #{event.inspect}. Valid events: #{@external_events.keys.inspect}"
    end
    event
  end

  if Phronomy.configuration.event_loop
    run_via_event_loop(state,
      recursion_limit: Phronomy.configuration.recursion_limit,
      resume_event: ev_to_fire, resume_phase: current_phase)
  else
    run_workflow(state, resume_event: ev_to_fire, resume_phase: current_phase)
  end
end

#stream(input, config: {}) {|Hash| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Streaming execution. Yields { state: Symbol, context: Object } after each state action completes.

Parameters:

  • input (Hash)
  • config (Hash) (defaults to: {})

Yields:

  • (Hash)

Returns:

  • (Object)

    final context



150
151
152
153
154
155
156
# File 'lib/phronomy/workflow_runner.rb', line 150

def stream(input, config: {}, &block)
  thread_id = config[:thread_id] || SecureRandom.uuid
  recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit)
  state = @state_class.new(**input)
  state.(thread_id: thread_id)
  run_workflow(state, recursion_limit: recursion_limit, &block)
end