Class: Rwm::TaskRunner
- Inherits:
-
Object
- Object
- Rwm::TaskRunner
- Defined in:
- lib/rwm/task_runner.rb
Defined Under Namespace
Classes: Result
Constant Summary collapse
- NO_TASK_PATTERN =
/Don't know how to build task/
Instance Attribute Summary collapse
-
#results ⇒ Object
readonly
Returns the value of attribute results.
Instance Method Summary collapse
- #failed_results ⇒ Object
-
#initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) ⇒ TaskRunner
constructor
A new instance of TaskRunner.
-
#run_command(&command_proc) ⇒ Object
Run a shell command in each package using DAG scheduling.
-
#run_task(task) ⇒ Object
Run a rake task in each package.
- #success? ⇒ Boolean
Constructor Details
#initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) ⇒ TaskRunner
Returns a new instance of TaskRunner.
14 15 16 17 18 19 20 21 |
# File 'lib/rwm/task_runner.rb', line 14 def initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) @graph = graph @packages = packages || graph.packages.values @buffered = buffered @concurrency = concurrency @results = [] @mutex = Mutex.new end |
Instance Attribute Details
#results ⇒ Object (readonly)
Returns the value of attribute results.
12 13 14 |
# File 'lib/rwm/task_runner.rb', line 12 def results @results end |
Instance Method Details
#failed_results ⇒ Object
113 114 115 |
# File 'lib/rwm/task_runner.rb', line 113 def failed_results @results.select { |r| !r.success } end |
#run_command(&command_proc) ⇒ Object
Run a shell command in each package using DAG scheduling. Starts each package as soon as its dependencies complete. The command_proc receives a Package and returns [command, args] array.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rwm/task_runner.rb', line 26 def run_command(&command_proc) package_names = @packages.map(&:name).to_set pending = @packages.dup completed = Set.new skipped = Set.new running = {} @interrupted = false mutex = Mutex.new condition = ConditionVariable.new previous_trap = Signal.trap("INT") do @interrupted = true # Cannot use mutex inside trap context — just kill threads directly. # Thread#kill is safe to call from trap context. running.each_value { |t| t.kill rescue nil } end done = false until done break if @interrupted mutex.synchronize do if pending.empty? && running.empty? done = true next end ready = pending.select { |pkg| ready?(pkg, package_names, completed) } ready.each do |pkg| break if running.size >= @concurrency break if @interrupted pending.delete(pkg) running[pkg.name] = Thread.new do result = run_single(pkg, &command_proc) mutex.synchronize do @results << result running.delete(pkg.name) if result.success completed << pkg.name else skip_names = @graph.transitive_dependents(pkg.name) .select { |n| package_names.include?(n) } skip_names.each do |name| skip_pkg = pending.find { |p| p.name == name } if skip_pkg pending.delete(skip_pkg) skipped << name @results << Result.new( package_name: name, task: "skipped", success: false, output: "Skipped due to failed dependency: #{pkg.name}" ) end end end condition.broadcast end end end if running.any? && ready.empty? condition.wait(mutex) end end end raise Interrupt, "Interrupted by Ctrl+C" if @interrupted @results ensure Signal.trap("INT", previous_trap || "DEFAULT") end |
#run_task(task) ⇒ Object
Run a rake task in each package
103 104 105 106 107 |
# File 'lib/rwm/task_runner.rb', line 103 def run_task(task) run_command do |pkg| ["bundle", "exec", "rake", task] end end |
#success? ⇒ Boolean
109 110 111 |
# File 'lib/rwm/task_runner.rb', line 109 def success? @results.all?(&:success) end |