Class: SimpleFlow::DependencyGraph

Inherits:
Object
  • Object
show all
Includes:
TSort
Defined in:
lib/simple_flow/dependency_graph.rb

Overview

DependencyGraph manages dependencies between pipeline steps and determines which steps can be executed in parallel. This is adapted from the dagwood gem (github.com/rewindio/dagwood) to work with SimpleFlow pipelines.

Example:

graph = SimpleFlow::DependencyGraph.new(
  fetch_user: [],
  fetch_orders: [:fetch_user],
  fetch_products: [:fetch_user],
  calculate_total: [:fetch_orders, :fetch_products]
)

graph.parallel_order
# => [[:fetch_user], [:fetch_orders, :fetch_products], [:calculate_total]]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dependencies) ⇒ DependencyGraph

Returns a new instance of DependencyGraph.

Parameters:

  • dependencies (Hash)

    A hash of the form { step1: [:step2, :step3], step2: [:step3], step3: []} would mean that “step1” depends on step2 and step3, step2 depends on step3 and step3 has no dependencies. Nil and missing values will be converted to [].



31
32
33
# File 'lib/simple_flow/dependency_graph.rb', line 31

def initialize(dependencies)
  @dependencies = Hash.new([]).merge(dependencies.transform_values { |v| v.nil? ? [] : Array(v).sort })
end

Instance Attribute Details

#dependenciesObject (readonly)

Returns the value of attribute dependencies.



25
26
27
# File 'lib/simple_flow/dependency_graph.rb', line 25

def dependencies
  @dependencies
end

Instance Method Details

#merge(other) ⇒ DependencyGraph

Returns a new graph containing all dependencies from this graph and the given graph. If both graphs depend on the same item, but that item’s sub-dependencies differ, the resulting graph will depend on the union of both.

Parameters:

Returns:



100
101
102
103
104
105
106
107
108
# File 'lib/simple_flow/dependency_graph.rb', line 100

def merge(other)
  all_dependencies = {}

  (dependencies.keys | other.dependencies.keys).each do |key|
    all_dependencies[key] = dependencies[key] | other.dependencies[key]
  end

  self.class.new all_dependencies
end

#orderArray

Returns steps in topological order (dependencies first)

Returns:

  • (Array)

    ordered list of step names



37
38
39
# File 'lib/simple_flow/dependency_graph.rb', line 37

def order
  @order ||= tsort
end

#parallel_orderArray<Array>

Groups steps that can be executed in parallel. Steps can run in parallel if:

1) They have the exact same dependencies OR
2) All of a step's dependencies have been resolved in previous groups

Returns:

  • (Array<Array>)

    array of groups, where each group can run in parallel



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/simple_flow/dependency_graph.rb', line 53

def parallel_order
  groups = []
  ungrouped_dependencies = order.dup

  until ungrouped_dependencies.empty?
    # Start this group with the first dependency we haven't grouped yet
    group_starter = ungrouped_dependencies.delete_at(0)
    group = [group_starter]

    ungrouped_dependencies.each do |ungrouped_dependency|
      same_priority = @dependencies[ungrouped_dependency].all? do |sub_dependency|
        groups.reduce(false) { |found, g| found || g.include?(sub_dependency) }
      end

      group << ungrouped_dependency if same_priority
    end

    # Remove dependencies we managed to group
    ungrouped_dependencies -= group

    groups << group.sort
  end

  groups
end

#reverse_orderArray

Returns steps in reverse topological order

Returns:

  • (Array)

    reverse ordered list of step names



43
44
45
# File 'lib/simple_flow/dependency_graph.rb', line 43

def reverse_order
  @reverse_order ||= order.reverse
end

#subgraph(node) ⇒ DependencyGraph

Generate a subgraph starting at the given node

Parameters:

  • node (Symbol)

    the starting node

Returns:

  • (DependencyGraph)

    a new graph containing only the node and its dependencies



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/simple_flow/dependency_graph.rb', line 82

def subgraph(node)
  return self.class.new({}) unless @dependencies.key? node

  # Add the given node and its dependencies to our hash
  hash = {}
  hash[node] = @dependencies[node]

  # For every dependency of the given node, recursively create a subgraph and merge it into our result
  @dependencies[node].each { |dep| hash.merge! subgraph(dep).dependencies }

  self.class.new hash
end