Class: Rwm::TaskRunner
- Inherits:
-
Object
- Object
- Rwm::TaskRunner
- Defined in:
- lib/rwm/task_runner.rb
Defined Under Namespace
Classes: Result
Instance Attribute Summary collapse
-
#results ⇒ Object
readonly
Returns the value of attribute results.
Instance Method Summary collapse
- #failed_results ⇒ Object
-
#initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) ⇒ TaskRunner
constructor
A new instance of TaskRunner.
-
#run_command(&command_proc) ⇒ Object
Run a shell command in each package using DAG scheduling.
-
#run_task(task) ⇒ Object
Run a rake task in each package.
- #success? ⇒ Boolean
Constructor Details
#initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) ⇒ TaskRunner
Returns a new instance of TaskRunner.
12 13 14 15 16 17 18 19 |
# File 'lib/rwm/task_runner.rb', line 12 def initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) @graph = graph @packages = packages || graph.packages.values @buffered = buffered @concurrency = concurrency @results = [] @mutex = Mutex.new end |
Instance Attribute Details
#results ⇒ Object (readonly)
Returns the value of attribute results.
10 11 12 |
# File 'lib/rwm/task_runner.rb', line 10 def results @results end |
Instance Method Details
#failed_results ⇒ Object
90 91 92 |
# File 'lib/rwm/task_runner.rb', line 90 def failed_results @results.select { |r| !r.success } end |
#run_command(&command_proc) ⇒ Object
Run a shell command in each package using DAG scheduling. Starts each package as soon as its dependencies complete. The command_proc receives a Package and returns [command, args] array.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rwm/task_runner.rb', line 24 def run_command(&command_proc) package_names = @packages.map(&:name).to_set pending = @packages.dup completed = Set.new skipped = Set.new running = {} mutex = Mutex.new condition = ConditionVariable.new until pending.empty? && running.empty? mutex.synchronize do ready = pending.select { |pkg| ready?(pkg, package_names, completed) } ready.each do |pkg| break if running.size >= @concurrency pending.delete(pkg) running[pkg.name] = Thread.new do result = run_single(pkg, &command_proc) mutex.synchronize do @results << result running.delete(pkg.name) if result.success completed << pkg.name else skip_names = @graph.transitive_dependents(pkg.name) .select { |n| package_names.include?(n) } skip_names.each do |name| skip_pkg = pending.find { |p| p.name == name } if skip_pkg pending.delete(skip_pkg) skipped << name @results << Result.new( package_name: name, task: "skipped", success: false, output: "Skipped due to failed dependency: #{pkg.name}" ) end end end condition.broadcast end end end if running.any? && ready.empty? condition.wait(mutex) end end end @results end |
#run_task(task) ⇒ Object
Run a rake task in each package
80 81 82 83 84 |
# File 'lib/rwm/task_runner.rb', line 80 def run_task(task) run_command do |pkg| ["bundle", "exec", "rake", task] end end |
#success? ⇒ Boolean
86 87 88 |
# File 'lib/rwm/task_runner.rb', line 86 def success? @results.all?(&:success) end |