Class: KairosMcp::Daemon::TaskDag
- Inherits:
-
Object
- Object
- KairosMcp::Daemon::TaskDag
- Defined in:
- lib/kairos_mcp/daemon/task_dag.rb
Overview
TaskDag — dependency-aware, single-threaded task graph for agent cycles.
Design (v0.2 §2, P3.1):
A mandate may fan out into multiple concrete tool invocations with
dependencies between them (e.g. "fetch inputs", then "transform",
then "summarize"). TaskDag orders them without introducing
concurrency — #next_runnable returns AT MOST ONE node per call,
chosen from nodes whose dependencies are all :completed.
Invariants:
* Graph is acyclic (validated via Kahn's algorithm at construction).
* No self-dependency (depends_on must not contain the node's own id).
* All referenced dependency ids exist in the node set.
* Status transitions are forward-only per node:
:pending → :running | :completed | :failed | :cancelled
:running → :completed | :failed | :cancelled
Terminal states (:completed, :failed, :cancelled) are sticky.
Failure propagation policies (attached to each node):
:halt — on failure, cancel ALL remaining :pending nodes
:skip_dependents — cancel only the transitive dependents of the failed
node; unrelated siblings are left :pending
:continue — failure is absorbed locally; other nodes unaffected
Defined Under Namespace
Classes: CyclicGraphError, InvalidNodeError, InvalidTransitionError, Node
Constant Summary collapse
- VALID_STATUSES =
%i[pending running completed failed cancelled].freeze
- VALID_POLICIES =
%i[halt skip_dependents continue].freeze
- ALLOWED_TRANSITIONS =
Forward-only transition table. Terminal states have no outgoing edges.
{ pending: %i[running completed failed cancelled], running: %i[completed failed cancelled], completed: [], failed: [], cancelled: [] }.freeze
Instance Method Summary collapse
-
#all_completed? ⇒ Boolean
True when every node is in a terminal state (no more work possible).
-
#initialize(nodes) ⇒ TaskDag
constructor
A new instance of TaskDag.
-
#mark(id, status, error: nil) ⇒ Node
Transition a node to a new status.
-
#next_runnable ⇒ Object
Return the first :pending node whose every dependency is :completed.
- #node(id) ⇒ Object
-
#nodes ⇒ Object
Non-destructive read of the current node set, in insertion order.
- #size ⇒ Object
-
#to_plan_steps(plan_id: 'plan_dag', cycle: 1) ⇒ Object
Linearize the DAG into an Array of WAL-step Hashes.
-
#topological_order ⇒ Object
Kahn’s topological sort.
Constructor Details
#initialize(nodes) ⇒ TaskDag
Returns a new instance of TaskDag.
67 68 69 70 71 72 73 |
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 67 def initialize(nodes) @nodes = {} Array(nodes).each { |n| add_node(n) } validate_references! validate_no_self_dep! validate_acyclic! end |
Instance Method Details
#all_completed? ⇒ Boolean
True when every node is in a terminal state (no more work possible).
129 130 131 |
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 129 def all_completed? @nodes.each_value.all?(&:terminal?) end |
#mark(id, status, error: nil) ⇒ Node
Transition a node to a new status. Propagates failures according to the node’s failure_policy when ‘status` is :failed.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 111 def mark(id, status, error: nil) status = status.to_sym raise InvalidNodeError, "unknown node: #{id}" unless @nodes.key?(id.to_s) raise InvalidTransitionError, "invalid status: #{status}" unless VALID_STATUSES.include?(status) node = @nodes[id.to_s] unless ALLOWED_TRANSITIONS[node.status].include?(status) raise InvalidTransitionError, "node #{id}: #{node.status} -> #{status} not allowed" end node.status = status node.error = error if error propagate_failure!(node) if status == :failed node end |
#next_runnable ⇒ Object
Return the first :pending node whose every dependency is :completed.
Single-threaded by design: the caller executes the returned node, calls #mark, then calls #next_runnable again. Returns nil if no node is currently runnable (either empty, all terminal, or blocked by in-flight :running nodes).
94 95 96 97 98 99 100 101 102 |
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 94 def next_runnable @nodes.each_value do |node| next unless node.pending? next unless deps_satisfied?(node) return node end nil end |
#node(id) ⇒ Object
80 81 82 |
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 80 def node(id) @nodes[id.to_s] end |
#nodes ⇒ Object
Non-destructive read of the current node set, in insertion order.
76 77 78 |
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 76 def nodes @nodes.values end |
#size ⇒ Object
84 85 86 |
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 84 def size @nodes.size end |
#to_plan_steps(plan_id: 'plan_dag', cycle: 1) ⇒ Object
Linearize the DAG into an Array of WAL-step Hashes. Each step has the same shape Planner emits so WalPhaseRecorder / wal.commit_plan can ingest the result unchanged.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 167 def to_plan_steps(plan_id: 'plan_dag', cycle: 1) topological_order.each_with_index.map do |id, idx| n = @nodes[id] params = { node_id: n.id, tool: n.tool, args: n.args, depends_on: n.depends_on, order: idx, cycle: cycle, plan_id: plan_id } { step_id: n.id, tool: n.tool, params_hash: Canonical.sha256_json(params), pre_hash: Canonical.sha256_json({ node: n.id, state: 'pre', cycle: cycle }), expected_post_hash: Canonical.sha256_json({ node: n.id, state: 'post', cycle: cycle }) } end end |
#topological_order ⇒ Object
Kahn’s topological sort. Ties are broken by insertion order, which (given a fixed input) yields a deterministic ordering across runs —important because this ordering is fed into the WAL as step order.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 136 def topological_order in_degree = @nodes.transform_values { |n| n.depends_on.size } # Children map: dep_id → [ids that depend on it] children = Hash.new { |h, k| h[k] = [] } @nodes.each_value do |n| n.depends_on.each { |d| children[d] << n.id } end order = [] # Seed with zero-in-degree nodes in insertion order. ready = @nodes.each_value.select { |n| in_degree[n.id].zero? }.map(&:id) until ready.empty? id = ready.shift order << id children[id].each do |child_id| in_degree[child_id] -= 1 ready << child_id if in_degree[child_id].zero? end end raise CyclicGraphError, 'cycle detected' unless order.size == @nodes.size order end |