Class: Vivlio::Starter::CLI::Metrics::ParallelRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/vivlio/starter/cli/metrics/parallel_runner.rb

Overview

並列実行を管理する

Constant Summary collapse

CONCURRENCY_ENV =
'VIVLIO_METRICS_CONCURRENCY'
MAX_CONCURRENCY =
4

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency: nil) ⇒ ParallelRunner

Returns a new instance of ParallelRunner.



30
31
32
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 30

def initialize(concurrency: nil)
  @concurrency = concurrency || determine_concurrency
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



34
35
36
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 34

def concurrency
  @concurrency
end

Instance Method Details

#determine_concurrencyObject

並列度を決定する(build コマンドと同じロジック)



37
38
39
40
41
42
43
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 37

def determine_concurrency
  env_value = ENV[CONCURRENCY_ENV].to_i
  return env_value if env_value.positive?

  n_cores = Etc.respond_to?(:nprocessors) ? Etc.nprocessors : 2
  n_cores.clamp(1, MAX_CONCURRENCY)
end

#parallel_each_with_progress(items, on_complete: nil, &block) ⇒ Object

アイテムを並列処理し、完了時にコールバックを呼ぶ



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 77

def parallel_each_with_progress(items, on_complete: nil, &block)
  list = Array(items)
  return if list.empty?

  if concurrency <= 1
    list.each do |item|
      result = block.call(item)
      on_complete&.call(item, result)
    end
    return
  end

  mutex = Mutex.new
  queue = Queue.new

  list.each { queue << it }
  sentinel = Object.new
  concurrency.times { queue << sentinel }

  workers = Array.new(concurrency) do
    Thread.new do
      loop do
        item = queue.pop
        break if item.equal?(sentinel)

        result = block.call(item)
        mutex.synchronize { on_complete&.call(item, result) }
      end
    end
  end

  workers.each(&:join)
end

#parallel_map(items, &block) ⇒ Object

アイテムを並列処理する



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/vivlio/starter/cli/metrics/parallel_runner.rb', line 46

def parallel_map(items, &block)
  list = Array(items)
  return [] if list.empty?
  return list.map(&block) if concurrency <= 1

  results = Array.new(list.size)
  mutex = Mutex.new
  queue = Queue.new

  list.each_with_index { |item, idx| queue << [item, idx] }
  sentinel = Object.new
  concurrency.times { queue << sentinel }

  workers = Array.new(concurrency) do
    Thread.new do
      loop do
        entry = queue.pop
        break if entry.equal?(sentinel)

        item, idx = entry
        result = block.call(item)
        mutex.synchronize { results[idx] = result }
      end
    end
  end

  workers.each(&:join)
  results
end