Module: Ocak::ParallelExecution
- Included in:
- PipelineExecutor
- Defined in:
- lib/ocak/parallel_execution.rb
Overview
Parallel group execution logic extracted from PipelineExecutor. Includers must provide run_single_step method and symbolize helper.
Instance Method Summary collapse
- #collect_parallel_group(steps, start_idx) ⇒ Object
- #run_parallel_group(group, issue_number, state, logger:, claude:, chdir:) ⇒ Object
Instance Method Details
#collect_parallel_group(steps, start_idx) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/ocak/parallel_execution.rb', line 7 def collect_parallel_group(steps, start_idx) group = [] idx = start_idx while idx < steps.size step = symbolize(steps[idx]) break unless step[:parallel] group << [step, idx] idx += 1 end group end |
#run_parallel_group(group, issue_number, state, logger:, claude:, chdir:) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/ocak/parallel_execution.rb', line 20 def run_parallel_group(group, issue_number, state, logger:, claude:, chdir:) mutex = Mutex.new threads = group.map do |step, idx| Thread.new do run_single_step(step, idx, issue_number, state, logger: logger, claude: claude, chdir: chdir, mutex: mutex) rescue StandardError => e logger.error("#{step[:role]} thread failed: #{e.}") { success: false, phase: step[:role].to_s, output: "Thread error: #{e.}" } end end results = threads.map(&:value) results.compact.find { |r| r.is_a?(Hash) && !r[:success] } end |