Class: Rufio::ParallelScanner

Inherits:
Object
  • Object
show all
Defined in:
lib/rufio/parallel_scanner.rb

Overview

並列スキャン最適化クラス

複数のディレクトリを並列にスキャンし、結果をマージします。スレッドプールを使用して効率的に並列処理を行います。

使用例:

parallel_scanner = ParallelScanner.new(max_workers: 4)
results = parallel_scanner.scan_all(['/path1', '/path2', '/path3'])
all_entries = parallel_scanner.scan_all_merged(['/path1', '/path2'])

Constant Summary collapse

DEFAULT_MAX_WORKERS =
4

Instance Method Summary collapse

Constructor Details

#initialize(max_workers: DEFAULT_MAX_WORKERS, backend: :ruby) ⇒ ParallelScanner

Returns a new instance of ParallelScanner.

Parameters:

  • max_workers (Integer) (defaults to: DEFAULT_MAX_WORKERS)

    最大ワーカー数

  • backend (Symbol) (defaults to: :ruby)

    バックエンド (:ruby or :zig)



21
22
23
24
# File 'lib/rufio/parallel_scanner.rb', line 21

def initialize(max_workers: DEFAULT_MAX_WORKERS, backend: :ruby)
  @max_workers = max_workers
  @backend = backend
end

Instance Method Details

#scan_all(paths) ⇒ Array<Hash>

複数のディレクトリを並列スキャン

Parameters:

  • paths (Array<String>)

    スキャンするディレクトリパスのリスト

Returns:

  • (Array<Hash>)

    各ディレクトリのスキャン結果

    entries:, success:, error:, …


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rufio/parallel_scanner.rb', line 31

def scan_all(paths)
  return [] if paths.empty?

  results = []
  mutex = Mutex.new
  queue = Queue.new

  # キューにパスを追加
  paths.each { |path| queue << path }

  # ワーカースレッドを作成
  workers = []
  worker_count = [@max_workers, paths.length].min

  worker_count.times do
    workers << Thread.new do
      loop do
        path = queue.pop(true) rescue nil
        break if path.nil?

        result = scan_single_directory(path)
        mutex.synchronize { results << result }
      end
    end
  end

  # 全ワーカーの完了を待つ
  workers.each(&:join)

  results
end

#scan_all_merged(paths) {|entry| ... } ⇒ Array<Hash>

複数のディレクトリを並列スキャンし、結果をマージ

Parameters:

  • paths (Array<String>)

    スキャンするディレクトリパスのリスト

Yields:

  • (entry)

    各エントリをフィルタリングするブロック(オプション)

Returns:

  • (Array<Hash>)

    全エントリのマージされた配列



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/rufio/parallel_scanner.rb', line 68

def scan_all_merged(paths, &filter)
  results = scan_all(paths)

  # 成功した結果のみを取得
  all_entries = results
    .select { |r| r[:success] }
    .flat_map { |r| r[:entries] }

  # フィルタが指定されている場合は適用
  all_entries = all_entries.select(&filter) if block_given?

  all_entries
end

#scan_all_with_progress(paths) {|completed, total| ... } ⇒ Array<Hash>

進捗報告付き並列スキャン

Parameters:

  • paths (Array<String>)

    スキャンするディレクトリパスのリスト

Yields:

  • (completed, total)

    進捗情報を受け取るブロック

Returns:

  • (Array<Hash>)

    各ディレクトリのスキャン結果



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/rufio/parallel_scanner.rb', line 87

def scan_all_with_progress(paths, &block)
  return [] if paths.empty?

  results = []
  mutex = Mutex.new
  queue = Queue.new
  completed = 0
  total = paths.length

  # キューにパスを追加
  paths.each { |path| queue << path }

  # ワーカースレッドを作成
  workers = []
  worker_count = [@max_workers, paths.length].min

  worker_count.times do
    workers << Thread.new do
      loop do
        path = queue.pop(true) rescue nil
        break if path.nil?

        result = scan_single_directory(path)

        mutex.synchronize do
          results << result
          completed += 1
          yield(completed, total) if block_given?
        end
      end
    end
  end

  # 全ワーカーの完了を待つ
  workers.each(&:join)

  results
end