Class: RubynCode::Tasks::DAG
- Inherits:
-
Object
- Object
- RubynCode::Tasks::DAG
- Defined in:
- lib/rubyn_code/tasks/dag.rb
Overview
Directed acyclic graph tracking task dependencies. Backed by a SQLite table for persistence; keeps an in-memory adjacency list for fast traversal.
Instance Method Summary collapse
-
#add_dependency(task_id, depends_on_id) ⇒ void
Declares that
task_iddepends ondepends_on_id. -
#blocked?(task_id) ⇒ Boolean
Returns true if
task_idhas any incomplete dependency. -
#dependencies_for(task_id) ⇒ Array<String>
Returns the IDs of tasks that
task_iddirectly depends on. -
#dependents_of(task_id) ⇒ Array<String>
Returns the IDs of tasks that directly depend on
task_id. -
#initialize(db) ⇒ DAG
constructor
A new instance of DAG.
-
#remove_dependency(task_id, depends_on_id) ⇒ void
Removes a dependency edge.
-
#topological_sort ⇒ Array<String>
Returns all known task IDs in a valid execution order (dependencies first).
-
#unblock_cascade(completed_task_id) ⇒ Array<String>
Called when a task is completed.
Constructor Details
#initialize(db) ⇒ DAG
Returns a new instance of DAG.
10 11 12 13 14 15 16 |
# File 'lib/rubyn_code/tasks/dag.rb', line 10 def initialize(db) @db = db @forward = Hash.new { |h, k| h[k] = Set.new } # task_id -> depends_on ids @reverse = Hash.new { |h, k| h[k] = Set.new } # task_id -> dependent ids ensure_table load_from_db end |
Instance Method Details
#add_dependency(task_id, depends_on_id) ⇒ void
This method returns an undefined value.
Declares that task_id depends on depends_on_id.
24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rubyn_code/tasks/dag.rb', line 24 def add_dependency(task_id, depends_on_id) raise ArgumentError, 'A task cannot depend on itself' if task_id == depends_on_id raise ArgumentError, 'Cycle detected' if reachable?(depends_on_id, task_id) return if @forward[task_id].include?(depends_on_id) @db.execute( 'INSERT OR IGNORE INTO task_dependencies (task_id, depends_on_id) VALUES (?, ?)', [task_id, depends_on_id] ) @forward[task_id].add(depends_on_id) @reverse[depends_on_id].add(task_id) end |
#blocked?(task_id) ⇒ Boolean
Returns true if task_id has any incomplete dependency.
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/rubyn_code/tasks/dag.rb', line 72 def blocked?(task_id) deps = @forward[task_id] return false if deps.empty? rows = @db.query( "SELECT id FROM tasks WHERE id IN (#{placeholders(deps.size)}) AND status != 'completed'", deps.to_a ).to_a !rows.empty? end |
#dependencies_for(task_id) ⇒ Array<String>
Returns the IDs of tasks that task_id directly depends on.
56 57 58 |
# File 'lib/rubyn_code/tasks/dag.rb', line 56 def dependencies_for(task_id) @forward[task_id].to_a end |
#dependents_of(task_id) ⇒ Array<String>
Returns the IDs of tasks that directly depend on task_id.
64 65 66 |
# File 'lib/rubyn_code/tasks/dag.rb', line 64 def dependents_of(task_id) @reverse[task_id].to_a end |
#remove_dependency(task_id, depends_on_id) ⇒ void
This method returns an undefined value.
Removes a dependency edge.
43 44 45 46 47 48 49 50 |
# File 'lib/rubyn_code/tasks/dag.rb', line 43 def remove_dependency(task_id, depends_on_id) @db.execute( 'DELETE FROM task_dependencies WHERE task_id = ? AND depends_on_id = ?', [task_id, depends_on_id] ) @forward[task_id].delete(depends_on_id) @reverse[depends_on_id].delete(task_id) end |
#topological_sort ⇒ Array<String>
Returns all known task IDs in a valid execution order (dependencies first).
115 116 117 118 119 120 121 122 123 |
# File 'lib/rubyn_code/tasks/dag.rb', line 115 def topological_sort all_nodes = collect_all_nodes in_degree = compute_in_degrees(all_nodes) sorted = kahn_sort(all_nodes, in_degree) raise 'Cycle detected in task dependency graph' if sorted.size != all_nodes.size sorted end |
#unblock_cascade(completed_task_id) ⇒ Array<String>
Called when a task is completed. Removes it as a blocker from every dependent, flipping dependents from ‘blocked’ to ‘pending’ when all their deps are met.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/rubyn_code/tasks/dag.rb', line 89 def unblock_cascade(completed_task_id) unblocked = [] dependents_of(completed_task_id).each do |dep_id| next if blocked?(dep_id) rows = @db.query('SELECT status FROM tasks WHERE id = ?', [dep_id]).to_a next if rows.empty? current_status = rows.first['status'] next unless current_status == 'blocked' @db.execute( "UPDATE tasks SET status = 'pending', updated_at = datetime('now') WHERE id = ?", [dep_id] ) unblocked << dep_id end unblocked end |