Module: Commiti::DiffSummarizer::BatchRunner

Included in:
Commiti::DiffSummarizer
Defined in:
lib/services/diff_summarization/batch_runner.rb

Instance Method Summary collapse

Instance Method Details

#build_batch_jobs(jobs) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/services/diff_summarization/batch_runner.rb', line 65

def build_batch_jobs(jobs)
  batched = []
  current = []
  current_bytes = 0

  jobs.each do |job|
    chunk_bytes = job[:chunk][:diff].bytesize
    should_split = !current.empty? && (
      current.length >= MAX_BATCH_FILES ||
      current_bytes + chunk_bytes > MAX_BATCH_BYTES
    )

    if should_split
      batched << { items: current }
      current = []
      current_bytes = 0
    end

    current << job
    current_bytes += chunk_bytes
  end

  batched << { items: current } unless current.empty?
  batched
end

#format_chunk_summary(path:, summary:) ⇒ Object



143
144
145
# File 'lib/services/diff_summarization/batch_runner.rb', line 143

def format_chunk_summary(path:, summary:)
  "### #{path}\n#{summary.to_s.strip}"
end

#parse_batched_summary_output(output, expected_paths:) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/services/diff_summarization/batch_runner.rb', line 120

def parse_batched_summary_output(output, expected_paths:)
  sections = output.to_s.split(/^### /).map(&:strip).reject(&:empty?)
  parsed = {}

  sections.each do |section|
    lines = section.lines
    path = lines.first.to_s.strip
    next unless expected_paths.include?(path)

    summary = lines[1..].to_a.join.strip
    parsed[path] = summary unless summary.empty?
  end

  parsed
end

#process_batch_job(job, results:, client:, model:) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/services/diff_summarization/batch_runner.rb', line 48

def process_batch_job(job, results:, client:, model:)
  items = job[:items]
  if items.length == 1
    item = items.first
    summary = summarize_single_chunk(item[:chunk], client: client, model: model)
    results[item[:index]] = format_chunk_summary(path: item[:chunk][:path], summary: summary)
    return
  end

  summaries = summarize_chunk_batch(items, client: client, model: model)
  items.each do |item|
    summary = summaries[item[:chunk][:path].to_s]
    summary ||= summarize_single_chunk(item[:chunk], client: client, model: model)
    results[item[:index]] = format_chunk_summary(path: item[:chunk][:path], summary: summary)
  end
end

#run_async_summary_jobs(jobs, results:, client:, model:) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/services/diff_summarization/batch_runner.rb', line 23

def run_async_summary_jobs(jobs, results:, client:, model:)
  queue = Queue.new
  jobs.each { |job| queue << job }

  worker_count = summary_worker_count(jobs.length)
  captured_errors = Queue.new

  workers = Array.new(worker_count) do
    Thread.new do
      loop do
        job = queue.pop(true)
        process_batch_job(job, results: results, client: client, model: model)
      rescue ThreadError
        break
      rescue StandardError => e
        captured_errors << e
        break
      end
    end
  end

  workers.each(&:join)
  raise captured_errors.pop unless captured_errors.empty?
end

#summarize_chunk_batch(items, client:, model:) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/services/diff_summarization/batch_runner.rb', line 101

def summarize_chunk_batch(items, client:, model:)
  user = +"Summarize the following file diffs:\n\n"
  items.each do |item|
    path = item[:chunk][:path]
    diff = item[:chunk][:diff]
    user << "### #{path}\n```diff\n#{diff}\n```\n\n"
  end

  output = client.generate(
    system: BATCH_SYSTEM,
    user: user.rstrip,
    model: model,
    timeout_seconds: 120,
    open_timeout_seconds: 10
  )

  parse_batched_summary_output(output, expected_paths: items.map { |item| item[:chunk][:path].to_s })
end

#summarize_chunks(chunks, client:, model:) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/services/diff_summarization/batch_runner.rb', line 6

def summarize_chunks(chunks, client:, model:)
  results = Array.new(chunks.length)
  large_jobs = []

  chunks.each_with_index do |chunk, index|
    if chunk[:diff].bytesize > CHUNK_THRESHOLD
      large_jobs << { index: index, chunk: chunk }
    else
      results[index] = format_chunk_summary(path: chunk[:path], summary: mechanical_summary(chunk[:diff]))
    end
  end

  batched_jobs = build_batch_jobs(large_jobs)
  run_async_summary_jobs(batched_jobs, results: results, client: client, model: model) unless batched_jobs.empty?
  results
end

#summarize_single_chunk(chunk, client:, model:) ⇒ Object



91
92
93
94
95
96
97
98
99
# File 'lib/services/diff_summarization/batch_runner.rb', line 91

def summarize_single_chunk(chunk, client:, model:)
  client.generate(
    system: CHUNK_SYSTEM,
    user: "Summarize these changes:\n\n``diff\n#{chunk[:diff]}\n``",
    model: model,
    timeout_seconds: 120,
    open_timeout_seconds: 10
  )
end

#summary_worker_count(job_count) ⇒ Object



136
137
138
139
140
141
# File 'lib/services/diff_summarization/batch_runner.rb', line 136

def summary_worker_count(job_count)
  configured = Integer(ENV.fetch('DIFF_SUMMARY_WORKERS', DEFAULT_SUMMARY_WORKERS))
  configured.clamp(1, job_count)
rescue ArgumentError
  DEFAULT_SUMMARY_WORKERS.clamp(1, job_count)
end