Class: SimpleFlow::DependencyGraph
- Inherits:
-
Object
- Object
- SimpleFlow::DependencyGraph
- Includes:
- TSort
- Defined in:
- lib/simple_flow/dependency_graph.rb
Overview
DependencyGraph manages dependencies between pipeline steps and determines which steps can be executed in parallel. This is adapted from the dagwood gem (github.com/rewindio/dagwood) to work with SimpleFlow pipelines.
Example:
graph = SimpleFlow::DependencyGraph.new(
fetch_user: [],
fetch_orders: [:fetch_user],
fetch_products: [:fetch_user],
calculate_total: [:fetch_orders, :fetch_products]
)
graph.parallel_order
# => [[:fetch_user], [:fetch_orders, :fetch_products], [:calculate_total]]
Instance Attribute Summary collapse
-
#dependencies ⇒ Object
readonly
Returns the value of attribute dependencies.
Instance Method Summary collapse
-
#initialize(dependencies) ⇒ DependencyGraph
constructor
A new instance of DependencyGraph.
-
#merge(other) ⇒ DependencyGraph
Returns a new graph containing all dependencies from this graph and the given graph.
-
#order ⇒ Array
Returns steps in topological order (dependencies first).
-
#parallel_order ⇒ Array<Array>
Groups steps that can be executed in parallel.
-
#reverse_order ⇒ Array
Returns steps in reverse topological order.
-
#subgraph(node) ⇒ DependencyGraph
Generate a subgraph starting at the given node.
Constructor Details
#initialize(dependencies) ⇒ DependencyGraph
Returns a new instance of DependencyGraph.
31 32 33 |
# File 'lib/simple_flow/dependency_graph.rb', line 31 def initialize(dependencies) @dependencies = Hash.new([]).merge(dependencies.transform_values { |v| v.nil? ? [] : Array(v).sort }) end |
Instance Attribute Details
#dependencies ⇒ Object (readonly)
Returns the value of attribute dependencies.
25 26 27 |
# File 'lib/simple_flow/dependency_graph.rb', line 25 def dependencies @dependencies end |
Instance Method Details
#merge(other) ⇒ DependencyGraph
Returns a new graph containing all dependencies from this graph and the given graph. If both graphs depend on the same item, but that item’s sub-dependencies differ, the resulting graph will depend on the union of both.
100 101 102 103 104 105 106 107 108 |
# File 'lib/simple_flow/dependency_graph.rb', line 100 def merge(other) all_dependencies = {} (dependencies.keys | other.dependencies.keys).each do |key| all_dependencies[key] = dependencies[key] | other.dependencies[key] end self.class.new all_dependencies end |
#order ⇒ Array
Returns steps in topological order (dependencies first)
37 38 39 |
# File 'lib/simple_flow/dependency_graph.rb', line 37 def order @order ||= tsort end |
#parallel_order ⇒ Array<Array>
Groups steps that can be executed in parallel. Steps can run in parallel if:
1) They have the exact same dependencies OR
2) All of a step's dependencies have been resolved in previous groups
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/simple_flow/dependency_graph.rb', line 53 def parallel_order groups = [] ungrouped_dependencies = order.dup until ungrouped_dependencies.empty? # Start this group with the first dependency we haven't grouped yet group_starter = ungrouped_dependencies.delete_at(0) group = [group_starter] ungrouped_dependencies.each do |ungrouped_dependency| same_priority = @dependencies[ungrouped_dependency].all? do |sub_dependency| groups.reduce(false) { |found, g| found || g.include?(sub_dependency) } end group << ungrouped_dependency if same_priority end # Remove dependencies we managed to group ungrouped_dependencies -= group groups << group.sort end groups end |
#reverse_order ⇒ Array
Returns steps in reverse topological order
43 44 45 |
# File 'lib/simple_flow/dependency_graph.rb', line 43 def reverse_order @reverse_order ||= order.reverse end |
#subgraph(node) ⇒ DependencyGraph
Generate a subgraph starting at the given node
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/simple_flow/dependency_graph.rb', line 82 def subgraph(node) return self.class.new({}) unless @dependencies.key? node # Add the given node and its dependencies to our hash hash = {} hash[node] = @dependencies[node] # For every dependency of the given node, recursively create a subgraph and merge it into our result @dependencies[node].each { |dep| hash.merge! subgraph(dep).dependencies } self.class.new hash end |