Module: Braintrust::Contrib::OpenAI::Instrumentation::Common

Defined in:
lib/braintrust/contrib/openai/instrumentation/common.rb

Overview

Aggregation utilities for official OpenAI SDK instrumentation. These are specific to the official openai gem’s data structures (symbol keys, SDK objects).

Class Method Summary collapse

Class Method Details

.aggregate_responses_events(events) ⇒ Hash

Aggregate responses streaming events into a single response structure. Specific to official OpenAI SDK which returns typed event objects.

Parameters:

  • events (Array)

    array of event objects from stream

Returns:

  • (Hash)

    aggregated response with output, usage, etc.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/braintrust/contrib/openai/instrumentation/common.rb', line 112

def self.aggregate_responses_events(events)
  return {} if events.empty?

  # Find the response.completed event which has the final response
  completed_event = events.find { |e| e.respond_to?(:type) && e.type == :"response.completed" }

  if completed_event&.respond_to?(:response)
    response = completed_event.response
    return {
      id: response.respond_to?(:id) ? response.id : nil,
      output: response.respond_to?(:output) ? response.output : nil,
      usage: response.respond_to?(:usage) ? response.usage : nil
    }
  end

  # Fallback if no completed event found
  {}
end

.aggregate_streaming_chunks(chunks) ⇒ Hash

Aggregate streaming chunks into a single response structure. Specific to official OpenAI SDK which uses symbol keys and SDK objects.

Parameters:

  • chunks (Array<Hash>)

    array of chunk hashes from stream (symbol keys)

Returns:

  • (Hash)

    aggregated response with choices, usage, etc. (symbol keys)



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/braintrust/contrib/openai/instrumentation/common.rb', line 14

def self.aggregate_streaming_chunks(chunks)
  return {} if chunks.empty?

  # Initialize aggregated structure
  aggregated = {
    id: nil,
    created: nil,
    model: nil,
    system_fingerprint: nil,
    choices: [],
    usage: nil
  }

  # Track aggregated content and tool_calls for each choice index
  choice_data = {}

  chunks.each do |chunk|
    # Capture top-level fields from any chunk that has them
    aggregated[:id] ||= chunk[:id]
    aggregated[:created] ||= chunk[:created]
    aggregated[:model] ||= chunk[:model]
    aggregated[:system_fingerprint] ||= chunk[:system_fingerprint]

    # Aggregate usage (usually only in last chunk if stream_options.include_usage is set)
    aggregated[:usage] = chunk[:usage] if chunk[:usage]

    # Process choices
    next unless chunk[:choices].is_a?(Array)
    chunk[:choices].each do |choice|
      index = choice[:index] || 0
      choice_data[index] ||= {
        index: index,
        role: nil,
        content: +"",
        tool_calls: [],
        finish_reason: nil
      }

      delta = choice[:delta] || {}

      # Aggregate role (set once from first delta that has it)
      choice_data[index][:role] ||= delta[:role]

      # Aggregate content
      choice_data[index][:content] << delta[:content] if delta[:content]

      # Aggregate tool_calls
      if delta[:tool_calls].is_a?(Array) && delta[:tool_calls].any?
        delta[:tool_calls].each do |tool_call_delta|
          if tool_call_delta[:id] && !tool_call_delta[:id].empty?
            # New tool call (dup strings to avoid mutating input)
            choice_data[index][:tool_calls] << {
              id: tool_call_delta[:id],
              type: tool_call_delta[:type],
              function: {
                name: +(tool_call_delta.dig(:function, :name) || ""),
                arguments: +(tool_call_delta.dig(:function, :arguments) || "")
              }
            }
          elsif choice_data[index][:tool_calls].any?
            # Continuation - append arguments to last tool call
            last_tool_call = choice_data[index][:tool_calls].last
            if tool_call_delta.dig(:function, :arguments)
              last_tool_call[:function][:arguments] << tool_call_delta[:function][:arguments]
            end
          end
        end
      end

      # Capture finish_reason
      choice_data[index][:finish_reason] = choice[:finish_reason] if choice[:finish_reason]
    end
  end

  # Build final choices array
  aggregated[:choices] = choice_data.values.sort_by { |c| c[:index] }.map do |choice|
    message = {
      role: choice[:role],
      content: choice[:content].empty? ? nil : choice[:content]
    }

    # Add tool_calls to message if any
    message[:tool_calls] = choice[:tool_calls] if choice[:tool_calls].any?

    {
      index: choice[:index],
      message: message,
      finish_reason: choice[:finish_reason]
    }
  end

  aggregated
end