Class: KairosMcp::Daemon::TaskDag

Inherits:
Object
  • Object
show all
Defined in:
lib/kairos_mcp/daemon/task_dag.rb

Overview

TaskDag — dependency-aware, single-threaded task graph for agent cycles.

Design (v0.2 §2, P3.1):

A mandate may fan out into multiple concrete tool invocations with
dependencies between them (e.g. "fetch inputs", then "transform",
then "summarize"). TaskDag orders them without introducing
concurrency — #next_runnable returns AT MOST ONE node per call,
chosen from nodes whose dependencies are all :completed.

Invariants:

* Graph is acyclic (validated via Kahn's algorithm at construction).
* No self-dependency (depends_on must not contain the node's own id).
* All referenced dependency ids exist in the node set.
* Status transitions are forward-only per node:
    :pending  → :running | :completed | :failed | :cancelled
    :running  → :completed | :failed | :cancelled
    Terminal states (:completed, :failed, :cancelled) are sticky.

Failure propagation policies (attached to each node):

:halt             — on failure, cancel ALL remaining :pending nodes
:skip_dependents  — cancel only the transitive dependents of the failed
                    node; unrelated siblings are left :pending
:continue         — failure is absorbed locally; other nodes unaffected

Defined Under Namespace

Classes: CyclicGraphError, InvalidNodeError, InvalidTransitionError, Node

Constant Summary collapse

VALID_STATUSES =
%i[pending running completed failed cancelled].freeze
VALID_POLICIES =
%i[halt skip_dependents continue].freeze
ALLOWED_TRANSITIONS =

Forward-only transition table. Terminal states have no outgoing edges.

{
  pending:   %i[running completed failed cancelled],
  running:   %i[completed failed cancelled],
  completed: [],
  failed:    [],
  cancelled: []
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(nodes) ⇒ TaskDag

Returns a new instance of TaskDag.

Parameters:

  • nodes (Array<Hash>)

    each Hash becomes a Node. Required keys: :id, :tool. Optional: :args (default {}), :depends_on (default []), :failure_policy (default :halt), :status (default :pending).



67
68
69
70
71
72
73
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 67

def initialize(nodes)
  @nodes = {}
  Array(nodes).each { |n| add_node(n) }
  validate_references!
  validate_no_self_dep!
  validate_acyclic!
end

Instance Method Details

#all_completed?Boolean

True when every node is in a terminal state (no more work possible).

Returns:

  • (Boolean)


129
130
131
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 129

def all_completed?
  @nodes.each_value.all?(&:terminal?)
end

#mark(id, status, error: nil) ⇒ Node

Transition a node to a new status. Propagates failures according to the node’s failure_policy when ‘status` is :failed.

Parameters:

  • id (String)
  • status (Symbol)

    one of VALID_STATUSES.

  • error (String, nil) (defaults to: nil)

    optional error message for :failed.

Returns:

  • (Node)

    the updated node.

Raises:



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 111

def mark(id, status, error: nil)
  status = status.to_sym
  raise InvalidNodeError, "unknown node: #{id}" unless @nodes.key?(id.to_s)
  raise InvalidTransitionError, "invalid status: #{status}" unless VALID_STATUSES.include?(status)

  node = @nodes[id.to_s]
  unless ALLOWED_TRANSITIONS[node.status].include?(status)
    raise InvalidTransitionError,
          "node #{id}: #{node.status} -> #{status} not allowed"
  end
  node.status = status
  node.error  = error if error

  propagate_failure!(node) if status == :failed
  node
end

#next_runnableObject

Return the first :pending node whose every dependency is :completed.

Single-threaded by design: the caller executes the returned node, calls #mark, then calls #next_runnable again. Returns nil if no node is currently runnable (either empty, all terminal, or blocked by in-flight :running nodes).



94
95
96
97
98
99
100
101
102
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 94

def next_runnable
  @nodes.each_value do |node|
    next unless node.pending?
    next unless deps_satisfied?(node)

    return node
  end
  nil
end

#node(id) ⇒ Object



80
81
82
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 80

def node(id)
  @nodes[id.to_s]
end

#nodesObject

Non-destructive read of the current node set, in insertion order.



76
77
78
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 76

def nodes
  @nodes.values
end

#sizeObject



84
85
86
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 84

def size
  @nodes.size
end

#to_plan_steps(plan_id: 'plan_dag', cycle: 1) ⇒ Object

Linearize the DAG into an Array of WAL-step Hashes. Each step has the same shape Planner emits so WalPhaseRecorder / wal.commit_plan can ingest the result unchanged.

Parameters:

  • plan_id (String) (defaults to: 'plan_dag')

    propagated into per-step params for hashing.

  • cycle (Integer) (defaults to: 1)


167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 167

def to_plan_steps(plan_id: 'plan_dag', cycle: 1)
  topological_order.each_with_index.map do |id, idx|
    n = @nodes[id]
    params = {
      node_id:    n.id,
      tool:       n.tool,
      args:       n.args,
      depends_on: n.depends_on,
      order:      idx,
      cycle:      cycle,
      plan_id:    plan_id
    }
    {
      step_id:            n.id,
      tool:               n.tool,
      params_hash:        Canonical.sha256_json(params),
      pre_hash:           Canonical.sha256_json({ node: n.id, state: 'pre', cycle: cycle }),
      expected_post_hash: Canonical.sha256_json({ node: n.id, state: 'post', cycle: cycle })
    }
  end
end

#topological_orderObject

Kahn’s topological sort. Ties are broken by insertion order, which (given a fixed input) yields a deterministic ordering across runs —important because this ordering is fed into the WAL as step order.

Raises:



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/kairos_mcp/daemon/task_dag.rb', line 136

def topological_order
  in_degree = @nodes.transform_values { |n| n.depends_on.size }
  # Children map: dep_id → [ids that depend on it]
  children = Hash.new { |h, k| h[k] = [] }
  @nodes.each_value do |n|
    n.depends_on.each { |d| children[d] << n.id }
  end

  order = []
  # Seed with zero-in-degree nodes in insertion order.
  ready = @nodes.each_value.select { |n| in_degree[n.id].zero? }.map(&:id)
  until ready.empty?
    id = ready.shift
    order << id
    children[id].each do |child_id|
      in_degree[child_id] -= 1
      ready << child_id if in_degree[child_id].zero?
    end
  end

  raise CyclicGraphError, 'cycle detected' unless order.size == @nodes.size

  order
end