Module: Ocak::ParallelExecution

Included in:
PipelineExecutor
Defined in:
lib/ocak/parallel_execution.rb

Overview

Parallel group execution logic extracted from PipelineExecutor. Includers must provide run_single_step method and symbolize helper.

Instance Method Summary collapse

Instance Method Details

#collect_parallel_group(steps, start_idx) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/ocak/parallel_execution.rb', line 7

def collect_parallel_group(steps, start_idx)
  group = []
  idx = start_idx
  while idx < steps.size
    step = symbolize(steps[idx])
    break unless step[:parallel]

    group << [step, idx]
    idx += 1
  end
  group
end

#run_parallel_group(group, issue_number, state, logger:, claude:, chdir:) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/ocak/parallel_execution.rb', line 20

def run_parallel_group(group, issue_number, state, logger:, claude:, chdir:)
  mutex = Mutex.new
  threads = group.map do |step, idx|
    Thread.new do
      run_single_step(step, idx, issue_number, state, logger: logger, claude: claude,
                                                      chdir: chdir, mutex: mutex)
    rescue StandardError => e
      logger.error("#{step[:role]} thread failed: #{e.message}")
      { success: false, phase: step[:role].to_s, output: "Thread error: #{e.message}" }
    end
  end

  results = threads.map(&:value)
  results.compact.find { |r| r.is_a?(Hash) && !r[:success] }
end