Class: Vivlio::Starter::CLI::Metrics::ParallelRunner
- Inherits:
-
Object
- Object
- Vivlio::Starter::CLI::Metrics::ParallelRunner
- Defined in:
- lib/vivlio/starter/cli/metrics/parallel_runner.rb
Overview
並列実行を管理する
Constant Summary collapse
- CONCURRENCY_ENV =
'VIVLIO_METRICS_CONCURRENCY'- MAX_CONCURRENCY =
4
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
Instance Method Summary collapse
-
#determine_concurrency ⇒ Object
並列度を決定する(build コマンドと同じロジック).
-
#initialize(concurrency: nil) ⇒ ParallelRunner
constructor
A new instance of ParallelRunner.
-
#parallel_each_with_progress(items, on_complete: nil, &block) ⇒ Object
アイテムを並列処理し、完了時にコールバックを呼ぶ.
-
#parallel_map(items, &block) ⇒ Object
アイテムを並列処理する.
Constructor Details
#initialize(concurrency: nil) ⇒ ParallelRunner
Returns a new instance of ParallelRunner.
30 31 32 |
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 30 def initialize(concurrency: nil) @concurrency = concurrency || determine_concurrency end |
Instance Attribute Details
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
34 35 36 |
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 34 def concurrency @concurrency end |
Instance Method Details
#determine_concurrency ⇒ Object
並列度を決定する(build コマンドと同じロジック)
37 38 39 40 41 42 43 |
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 37 def determine_concurrency env_value = ENV[CONCURRENCY_ENV].to_i return env_value if env_value.positive? n_cores = Etc.respond_to?(:nprocessors) ? Etc.nprocessors : 2 n_cores.clamp(1, MAX_CONCURRENCY) end |
#parallel_each_with_progress(items, on_complete: nil, &block) ⇒ Object
アイテムを並列処理し、完了時にコールバックを呼ぶ
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 77 def parallel_each_with_progress(items, on_complete: nil, &block) list = Array(items) return if list.empty? if concurrency <= 1 list.each do |item| result = block.call(item) on_complete&.call(item, result) end return end mutex = Mutex.new queue = Queue.new list.each { queue << it } sentinel = Object.new concurrency.times { queue << sentinel } workers = Array.new(concurrency) do Thread.new do loop do item = queue.pop break if item.equal?(sentinel) result = block.call(item) mutex.synchronize { on_complete&.call(item, result) } end end end workers.each(&:join) end |
#parallel_map(items, &block) ⇒ Object
アイテムを並列処理する
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 46 def parallel_map(items, &block) list = Array(items) return [] if list.empty? return list.map(&block) if concurrency <= 1 results = Array.new(list.size) mutex = Mutex.new queue = Queue.new list.each_with_index { |item, idx| queue << [item, idx] } sentinel = Object.new concurrency.times { queue << sentinel } workers = Array.new(concurrency) do Thread.new do loop do entry = queue.pop break if entry.equal?(sentinel) item, idx = entry result = block.call(item) mutex.synchronize { results[idx] = result } end end end workers.each(&:join) results end |