Class: Rwm::TaskRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/rwm/task_runner.rb

Defined Under Namespace

Classes: Result

Constant Summary collapse

NO_TASK_PATTERN =
/Don't know how to build task/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) ⇒ TaskRunner

Returns a new instance of TaskRunner.



14
15
16
17
18
19
20
21
# File 'lib/rwm/task_runner.rb', line 14

def initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors)
  @graph = graph
  @packages = packages || graph.packages.values
  @buffered = buffered
  @concurrency = concurrency
  @results = []
  @mutex = Mutex.new
end

Instance Attribute Details

#resultsObject (readonly)

Returns the value of attribute results.



12
13
14
# File 'lib/rwm/task_runner.rb', line 12

def results
  @results
end

Instance Method Details

#failed_resultsObject



113
114
115
# File 'lib/rwm/task_runner.rb', line 113

def failed_results
  @results.select { |r| !r.success }
end

#run_command(&command_proc) ⇒ Object

Run a shell command in each package using DAG scheduling. Starts each package as soon as its dependencies complete. The command_proc receives a Package and returns [command, args] array.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rwm/task_runner.rb', line 26

def run_command(&command_proc)
  package_names = @packages.map(&:name).to_set

  pending = @packages.dup
  completed = Set.new
  skipped = Set.new
  running = {}
  @interrupted = false

  mutex = Mutex.new
  condition = ConditionVariable.new

  previous_trap = Signal.trap("INT") do
    @interrupted = true
    # Cannot use mutex inside trap context — just kill threads directly.
    # Thread#kill is safe to call from trap context.
    running.each_value { |t| t.kill rescue nil }
  end

  done = false
  until done
    break if @interrupted

    mutex.synchronize do
      if pending.empty? && running.empty?
        done = true
        next
      end

      ready = pending.select { |pkg| ready?(pkg, package_names, completed) }

      ready.each do |pkg|
        break if running.size >= @concurrency
        break if @interrupted

        pending.delete(pkg)
        running[pkg.name] = Thread.new do
          result = run_single(pkg, &command_proc)
          mutex.synchronize do
            @results << result
            running.delete(pkg.name)
            if result.success
              completed << pkg.name
            else
              skip_names = @graph.transitive_dependents(pkg.name)
                                 .select { |n| package_names.include?(n) }
              skip_names.each do |name|
                skip_pkg = pending.find { |p| p.name == name }
                if skip_pkg
                  pending.delete(skip_pkg)
                  skipped << name
                  @results << Result.new(
                    package_name: name, task: "skipped",
                    success: false, output: "Skipped due to failed dependency: #{pkg.name}"
                  )
                end
              end
            end
            condition.broadcast
          end
        end
      end

      if running.any? && ready.empty?
        condition.wait(mutex)
      end
    end
  end

  raise Interrupt, "Interrupted by Ctrl+C" if @interrupted

  @results
ensure
  Signal.trap("INT", previous_trap || "DEFAULT")
end

#run_task(task) ⇒ Object

Run a rake task in each package



103
104
105
106
107
# File 'lib/rwm/task_runner.rb', line 103

def run_task(task)
  run_command do |pkg|
    ["bundle", "exec", "rake", task]
  end
end

#success?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/rwm/task_runner.rb', line 109

def success?
  @results.all?(&:success)
end