Module: Commiti::DiffSummarizer::BatchRunner
- Included in:
- Commiti::DiffSummarizer
- Defined in:
- lib/services/diff_summarization/batch_runner.rb
Instance Method Summary collapse
- #build_batch_jobs(jobs) ⇒ Object
- #format_chunk_summary(path:, summary:) ⇒ Object
- #parse_batched_summary_output(output, expected_paths:) ⇒ Object
- #process_batch_job(job, results:, client:, model:) ⇒ Object
- #run_async_summary_jobs(jobs, results:, client:, model:) ⇒ Object
- #summarize_chunk_batch(items, client:, model:) ⇒ Object
- #summarize_chunks(chunks, client:, model:) ⇒ Object
- #summarize_single_chunk(chunk, client:, model:) ⇒ Object
- #summary_worker_count(job_count) ⇒ Object
Instance Method Details
#build_batch_jobs(jobs) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/services/diff_summarization/batch_runner.rb', line 65 def build_batch_jobs(jobs) batched = [] current = [] current_bytes = 0 jobs.each do |job| chunk_bytes = job[:chunk][:diff].bytesize should_split = !current.empty? && ( current.length >= MAX_BATCH_FILES || current_bytes + chunk_bytes > MAX_BATCH_BYTES ) if should_split batched << { items: current } current = [] current_bytes = 0 end current << job current_bytes += chunk_bytes end batched << { items: current } unless current.empty? batched end |
#format_chunk_summary(path:, summary:) ⇒ Object
143 144 145 |
# File 'lib/services/diff_summarization/batch_runner.rb', line 143 def format_chunk_summary(path:, summary:) "### #{path}\n#{summary.to_s.strip}" end |
#parse_batched_summary_output(output, expected_paths:) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/services/diff_summarization/batch_runner.rb', line 120 def parse_batched_summary_output(output, expected_paths:) sections = output.to_s.split(/^### /).map(&:strip).reject(&:empty?) parsed = {} sections.each do |section| lines = section.lines path = lines.first.to_s.strip next unless expected_paths.include?(path) summary = lines[1..].to_a.join.strip parsed[path] = summary unless summary.empty? end parsed end |
#process_batch_job(job, results:, client:, model:) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/services/diff_summarization/batch_runner.rb', line 48 def process_batch_job(job, results:, client:, model:) items = job[:items] if items.length == 1 item = items.first summary = summarize_single_chunk(item[:chunk], client: client, model: model) results[item[:index]] = format_chunk_summary(path: item[:chunk][:path], summary: summary) return end summaries = summarize_chunk_batch(items, client: client, model: model) items.each do |item| summary = summaries[item[:chunk][:path].to_s] summary ||= summarize_single_chunk(item[:chunk], client: client, model: model) results[item[:index]] = format_chunk_summary(path: item[:chunk][:path], summary: summary) end end |
#run_async_summary_jobs(jobs, results:, client:, model:) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/services/diff_summarization/batch_runner.rb', line 23 def run_async_summary_jobs(jobs, results:, client:, model:) queue = Queue.new jobs.each { |job| queue << job } worker_count = summary_worker_count(jobs.length) captured_errors = Queue.new workers = Array.new(worker_count) do Thread.new do loop do job = queue.pop(true) process_batch_job(job, results: results, client: client, model: model) rescue ThreadError break rescue StandardError => e captured_errors << e break end end end workers.each(&:join) raise captured_errors.pop unless captured_errors.empty? end |
#summarize_chunk_batch(items, client:, model:) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/services/diff_summarization/batch_runner.rb', line 101 def summarize_chunk_batch(items, client:, model:) user = +"Summarize the following file diffs:\n\n" items.each do |item| path = item[:chunk][:path] diff = item[:chunk][:diff] user << "### #{path}\n```diff\n#{diff}\n```\n\n" end output = client.generate( system: BATCH_SYSTEM, user: user.rstrip, model: model, timeout_seconds: 120, open_timeout_seconds: 10 ) parse_batched_summary_output(output, expected_paths: items.map { |item| item[:chunk][:path].to_s }) end |
#summarize_chunks(chunks, client:, model:) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/services/diff_summarization/batch_runner.rb', line 6 def summarize_chunks(chunks, client:, model:) results = Array.new(chunks.length) large_jobs = [] chunks.each_with_index do |chunk, index| if chunk[:diff].bytesize > CHUNK_THRESHOLD large_jobs << { index: index, chunk: chunk } else results[index] = format_chunk_summary(path: chunk[:path], summary: mechanical_summary(chunk[:diff])) end end batched_jobs = build_batch_jobs(large_jobs) run_async_summary_jobs(batched_jobs, results: results, client: client, model: model) unless batched_jobs.empty? results end |
#summarize_single_chunk(chunk, client:, model:) ⇒ Object
91 92 93 94 95 96 97 98 99 |
# File 'lib/services/diff_summarization/batch_runner.rb', line 91 def summarize_single_chunk(chunk, client:, model:) client.generate( system: CHUNK_SYSTEM, user: "Summarize these changes:\n\n``diff\n#{chunk[:diff]}\n``", model: model, timeout_seconds: 120, open_timeout_seconds: 10 ) end |
#summary_worker_count(job_count) ⇒ Object
136 137 138 139 140 141 |
# File 'lib/services/diff_summarization/batch_runner.rb', line 136 def summary_worker_count(job_count) configured = Integer(ENV.fetch('DIFF_SUMMARY_WORKERS', DEFAULT_SUMMARY_WORKERS)) configured.clamp(1, job_count) rescue ArgumentError DEFAULT_SUMMARY_WORKERS.clamp(1, job_count) end |