Class: RubynCode::Tasks::DAG

Inherits:
Object
  • Object
show all
Defined in:
lib/rubyn_code/tasks/dag.rb

Overview

Directed acyclic graph tracking task dependencies. Backed by a SQLite table for persistence; keeps an in-memory adjacency list for fast traversal.

Instance Method Summary collapse

Constructor Details

#initialize(db) ⇒ DAG

Returns a new instance of DAG.

Parameters:



10
11
12
13
14
15
16
# File 'lib/rubyn_code/tasks/dag.rb', line 10

def initialize(db)
  @db = db
  @forward  = Hash.new { |h, k| h[k] = Set.new } # task_id -> depends_on ids
  @reverse  = Hash.new { |h, k| h[k] = Set.new } # task_id -> dependent ids
  ensure_table
  load_from_db
end

Instance Method Details

#add_dependency(task_id, depends_on_id) ⇒ void

This method returns an undefined value.

Declares that task_id depends on depends_on_id.

Parameters:

  • task_id (String)
  • depends_on_id (String)

Raises:

  • (ArgumentError)

    if this would create a cycle



24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rubyn_code/tasks/dag.rb', line 24

def add_dependency(task_id, depends_on_id)
  raise ArgumentError, 'A task cannot depend on itself' if task_id == depends_on_id
  raise ArgumentError, 'Cycle detected' if reachable?(depends_on_id, task_id)

  return if @forward[task_id].include?(depends_on_id)

  @db.execute(
    'INSERT OR IGNORE INTO task_dependencies (task_id, depends_on_id) VALUES (?, ?)',
    [task_id, depends_on_id]
  )
  @forward[task_id].add(depends_on_id)
  @reverse[depends_on_id].add(task_id)
end

#blocked?(task_id) ⇒ Boolean

Returns true if task_id has any incomplete dependency.

Parameters:

  • task_id (String)

Returns:

  • (Boolean)


72
73
74
75
76
77
78
79
80
81
# File 'lib/rubyn_code/tasks/dag.rb', line 72

def blocked?(task_id)
  deps = @forward[task_id]
  return false if deps.empty?

  rows = @db.query(
    "SELECT id FROM tasks WHERE id IN (#{placeholders(deps.size)}) AND status != 'completed'",
    deps.to_a
  ).to_a
  !rows.empty?
end

#dependencies_for(task_id) ⇒ Array<String>

Returns the IDs of tasks that task_id directly depends on.

Parameters:

  • task_id (String)

Returns:

  • (Array<String>)


56
57
58
# File 'lib/rubyn_code/tasks/dag.rb', line 56

def dependencies_for(task_id)
  @forward[task_id].to_a
end

#dependents_of(task_id) ⇒ Array<String>

Returns the IDs of tasks that directly depend on task_id.

Parameters:

  • task_id (String)

Returns:

  • (Array<String>)


64
65
66
# File 'lib/rubyn_code/tasks/dag.rb', line 64

def dependents_of(task_id)
  @reverse[task_id].to_a
end

#remove_dependency(task_id, depends_on_id) ⇒ void

This method returns an undefined value.

Removes a dependency edge.

Parameters:

  • task_id (String)
  • depends_on_id (String)


43
44
45
46
47
48
49
50
# File 'lib/rubyn_code/tasks/dag.rb', line 43

def remove_dependency(task_id, depends_on_id)
  @db.execute(
    'DELETE FROM task_dependencies WHERE task_id = ? AND depends_on_id = ?',
    [task_id, depends_on_id]
  )
  @forward[task_id].delete(depends_on_id)
  @reverse[depends_on_id].delete(task_id)
end

#topological_sortArray<String>

Returns all known task IDs in a valid execution order (dependencies first).

Returns:

  • (Array<String>)

Raises:

  • (RuntimeError)

    if the graph contains a cycle



115
116
117
118
119
120
121
122
123
# File 'lib/rubyn_code/tasks/dag.rb', line 115

def topological_sort
  all_nodes = collect_all_nodes
  in_degree = compute_in_degrees(all_nodes)

  sorted = kahn_sort(all_nodes, in_degree)
  raise 'Cycle detected in task dependency graph' if sorted.size != all_nodes.size

  sorted
end

#unblock_cascade(completed_task_id) ⇒ Array<String>

Called when a task is completed. Removes it as a blocker from every dependent, flipping dependents from ‘blocked’ to ‘pending’ when all their deps are met.

Parameters:

  • completed_task_id (String)

Returns:

  • (Array<String>)

    IDs of tasks that were unblocked



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/rubyn_code/tasks/dag.rb', line 89

def unblock_cascade(completed_task_id)
  unblocked = []

  dependents_of(completed_task_id).each do |dep_id|
    next if blocked?(dep_id)

    rows = @db.query('SELECT status FROM tasks WHERE id = ?', [dep_id]).to_a
    next if rows.empty?

    current_status = rows.first['status']
    next unless current_status == 'blocked'

    @db.execute(
      "UPDATE tasks SET status = 'pending', updated_at = datetime('now') WHERE id = ?",
      [dep_id]
    )
    unblocked << dep_id
  end

  unblocked
end