Class: Phronomy::Graph::CompiledGraph
- Inherits:
-
Object
- Object
- Phronomy::Graph::CompiledGraph
- Includes:
- Runnable
- Defined in:
- lib/phronomy/graph/compiled_graph.rb
Overview
Executable graph produced by StateGraph#compile. Includes Runnable so it can be embedded in a larger pipeline.
Instance Method Summary collapse
-
#initialize(state_class:, nodes:, edges:, conditional_edges:, entry_point:, before_callbacks: {}, after_callbacks: {}, state_store: nil) ⇒ CompiledGraph
constructor
A new instance of CompiledGraph.
-
#interrupt_after(node) {|state| ... } ⇒ self
Registers a callback to run after the given node completes.
-
#interrupt_before(node) {|state| ... } ⇒ self
Registers a callback to run before the given node executes.
-
#invoke(input, config: {}) ⇒ Object
Executes the graph from the entry point.
-
#resume(state:, input: nil) ⇒ Object
Resumes a halted graph from the state returned by a previous invoke/resume.
-
#stream(input, config: {}) {|Hash| ... } ⇒ Object
Streaming execution.
Methods included from Runnable
Constructor Details
#initialize(state_class:, nodes:, edges:, conditional_edges:, entry_point:, before_callbacks: {}, after_callbacks: {}, state_store: nil) ⇒ CompiledGraph
Returns a new instance of CompiledGraph.
12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/phronomy/graph/compiled_graph.rb', line 12 def initialize(state_class:, nodes:, edges:, conditional_edges:, entry_point:, before_callbacks: {}, after_callbacks: {}, state_store: nil) @state_class = state_class @nodes = nodes @edges = edges @conditional_edges = conditional_edges @entry_point = entry_point @before_callbacks = before_callbacks @after_callbacks = after_callbacks @state_store_override = state_store end |
Instance Method Details
#interrupt_after(node) {|state| ... } ⇒ self
Registers a callback to run after the given node completes. Return :halt from the block to pause execution; any other value continues.
39 40 41 42 |
# File 'lib/phronomy/graph/compiled_graph.rb', line 39 def interrupt_after(node, &block) @after_callbacks[node] = block self end |
#interrupt_before(node) {|state| ... } ⇒ self
Registers a callback to run before the given node executes. Return :halt from the block to pause execution; any other value continues.
29 30 31 32 |
# File 'lib/phronomy/graph/compiled_graph.rb', line 29 def interrupt_before(node, &block) @before_callbacks[node] = block self end |
#invoke(input, config: {}) ⇒ Object
Executes the graph from the entry point. Automatically assigns a thread_id if not supplied via config.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/phronomy/graph/compiled_graph.rb', line 50 def invoke(input, config: {}) = {} [:user_id] = config[:user_id] if config[:user_id] [:session_id] = config[:session_id] if config[:session_id] trace("graph.invoke", input: input.inspect, **) do |_span| thread_id = config[:thread_id] || SecureRandom.uuid recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit) state = @state_class.new(**input) state.(thread_id: thread_id, current_nodes: [], halted_before: false) result = execute_graph(state, recursion_limit: recursion_limit) [result, nil] end end |
#resume(state:, input: nil) ⇒ Object
Resumes a halted graph from the state returned by a previous invoke/resume.
69 70 71 72 73 74 75 76 |
# File 'lib/phronomy/graph/compiled_graph.rb', line 69 def resume(state:, input: nil) state = state.merge(input) if input from_nodes = state.current_nodes raise ArgumentError, "State has no pending nodes to resume from" if from_nodes.nil? || from_nodes.empty? execute_graph(state, from_node: from_nodes.first, skip_first_before: state.halted_before) end |
#stream(input, config: {}) {|Hash| ... } ⇒ Object
Streaming execution. Yields { node: Symbol, state: State } after each node completes.
83 84 85 86 87 88 89 |
# File 'lib/phronomy/graph/compiled_graph.rb', line 83 def stream(input, config: {}, &block) thread_id = config[:thread_id] || SecureRandom.uuid recursion_limit = config.fetch(:recursion_limit, Phronomy.configuration.recursion_limit) state = @state_class.new(**input) state.(thread_id: thread_id, current_nodes: [], halted_before: false) execute_graph(state, recursion_limit: recursion_limit, &block) end |