Class: Rwm::TaskRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/rwm/task_runner.rb

Defined Under Namespace

Classes: Result

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) ⇒ TaskRunner

Returns a new instance of TaskRunner.



12
13
14
15
16
17
18
19
# File 'lib/rwm/task_runner.rb', line 12

def initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors)
  @graph = graph
  @packages = packages || graph.packages.values
  @buffered = buffered
  @concurrency = concurrency
  @results = []
  @mutex = Mutex.new
end

Instance Attribute Details

#resultsObject (readonly)

Returns the value of attribute results.



10
11
12
# File 'lib/rwm/task_runner.rb', line 10

def results
  @results
end

Instance Method Details

#failed_resultsObject



90
91
92
# File 'lib/rwm/task_runner.rb', line 90

def failed_results
  @results.select { |r| !r.success }
end

#run_command(&command_proc) ⇒ Object

Run a shell command in each package using DAG scheduling. Starts each package as soon as its dependencies complete. The command_proc receives a Package and returns [command, args] array.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rwm/task_runner.rb', line 24

def run_command(&command_proc)
  package_names = @packages.map(&:name).to_set

  pending = @packages.dup
  completed = Set.new
  skipped = Set.new
  running = {}

  mutex = Mutex.new
  condition = ConditionVariable.new

  until pending.empty? && running.empty?
    mutex.synchronize do
      ready = pending.select { |pkg| ready?(pkg, package_names, completed) }

      ready.each do |pkg|
        break if running.size >= @concurrency

        pending.delete(pkg)
        running[pkg.name] = Thread.new do
          result = run_single(pkg, &command_proc)
          mutex.synchronize do
            @results << result
            running.delete(pkg.name)
            if result.success
              completed << pkg.name
            else
              skip_names = @graph.transitive_dependents(pkg.name)
                                 .select { |n| package_names.include?(n) }
              skip_names.each do |name|
                skip_pkg = pending.find { |p| p.name == name }
                if skip_pkg
                  pending.delete(skip_pkg)
                  skipped << name
                  @results << Result.new(
                    package_name: name, task: "skipped",
                    success: false, output: "Skipped due to failed dependency: #{pkg.name}"
                  )
                end
              end
            end
            condition.broadcast
          end
        end
      end

      if running.any? && ready.empty?
        condition.wait(mutex)
      end
    end
  end

  @results
end

#run_task(task) ⇒ Object

Run a rake task in each package



80
81
82
83
84
# File 'lib/rwm/task_runner.rb', line 80

def run_task(task)
  run_command do |pkg|
    ["bundle", "exec", "rake", task]
  end
end

#success?Boolean

Returns:

  • (Boolean)


86
87
88
# File 'lib/rwm/task_runner.rb', line 86

def success?
  @results.all?(&:success)
end