Module: Braintrust::Contrib::RubyOpenAI::Instrumentation::Common

Defined in:
lib/braintrust/contrib/ruby_openai/instrumentation/common.rb

Overview

Aggregation utilities for ruby-openai gem instrumentation. These are specific to the ruby-openai gem’s data structures (string keys, plain hashes).

Class Method Summary collapse

Class Method Details

.aggregate_responses_chunks(chunks) ⇒ Hash

Aggregate responses streaming chunks into a single response structure. Specific to ruby-openai gem which uses string keys and plain hashes.

Parameters:

  • chunks (Array<Hash>)

    array of chunk hashes from stream (string keys)

Returns:

  • (Hash)

    aggregated response with output, usage, id (string keys)



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/braintrust/contrib/ruby_openai/instrumentation/common.rb', line 116

def self.aggregate_responses_chunks(chunks)
  return {} if chunks.empty?

  # Find the response.completed event which has the final response
  completed_chunk = chunks.find { |c| c["type"] == "response.completed" }

  if completed_chunk && completed_chunk["response"]
    response = completed_chunk["response"]
    return {
      "id" => response["id"],
      "output" => response["output"],
      "usage" => response["usage"]
    }
  end

  # Fallback if no completed event found
  {}
end

.aggregate_streaming_chunks(chunks) ⇒ Hash

Aggregate streaming chunks into a single response structure. Specific to ruby-openai gem which uses string keys and plain hashes.

Parameters:

  • chunks (Array<Hash>)

    array of chunk hashes from stream (string keys)

Returns:

  • (Hash)

    aggregated response with choices, usage, etc. (string keys)



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/braintrust/contrib/ruby_openai/instrumentation/common.rb', line 14

def self.aggregate_streaming_chunks(chunks)
  return {} if chunks.empty?

  # Initialize aggregated structure
  aggregated = {
    "id" => nil,
    "created" => nil,
    "model" => nil,
    "system_fingerprint" => nil,
    "usage" => nil,
    "choices" => []
  }

  # Track aggregated content and tool_calls for each choice index
  choice_data = {}

  chunks.each do |chunk|
    # Capture top-level fields from any chunk that has them
    aggregated["id"] ||= chunk["id"]
    aggregated["created"] ||= chunk["created"]
    aggregated["model"] ||= chunk["model"]
    aggregated["system_fingerprint"] ||= chunk["system_fingerprint"]

    # Aggregate usage (usually only in last chunk if stream_options.include_usage is set)
    aggregated["usage"] = chunk["usage"] if chunk["usage"]

    # Process choices
    choices = chunk["choices"]
    next unless choices.is_a?(Array)

    choices.each do |choice|
      index = choice["index"] || 0
      choice_data[index] ||= {
        "index" => index,
        "role" => nil,
        "content" => +"",
        "tool_calls" => [],
        "finish_reason" => nil
      }

      delta = choice["delta"] || {}

      # Aggregate role (set once from first delta that has it)
      choice_data[index]["role"] ||= delta["role"]

      # Aggregate content
      choice_data[index]["content"] << delta["content"] if delta["content"]

      # Aggregate tool_calls
      tool_calls = delta["tool_calls"]
      if tool_calls.is_a?(Array) && tool_calls.any?
        tool_calls.each do |tool_call_delta|
          tc_id = tool_call_delta["id"]
          if tc_id && !tc_id.empty?
            # New tool call
            choice_data[index]["tool_calls"] << {
              "id" => tc_id,
              "type" => tool_call_delta["type"],
              "function" => {
                "name" => +(tool_call_delta.dig("function", "name") || ""),
                "arguments" => +(tool_call_delta.dig("function", "arguments") || "")
              }
            }
          elsif choice_data[index]["tool_calls"].any?
            # Continuation - append arguments to last tool call
            last_tool_call = choice_data[index]["tool_calls"].last
            if tool_call_delta.dig("function", "arguments")
              last_tool_call["function"]["arguments"] << tool_call_delta["function"]["arguments"]
            end
          end
        end
      end

      # Capture finish_reason
      choice_data[index]["finish_reason"] = choice["finish_reason"] if choice["finish_reason"]
    end
  end

  # Build final choices array
  aggregated["choices"] = choice_data.values.sort_by { |c| c["index"] }.map do |choice|
    message = {
      "role" => choice["role"],
      "content" => choice["content"].empty? ? nil : choice["content"]
    }

    # Add tool_calls to message if any
    message["tool_calls"] = choice["tool_calls"] if choice["tool_calls"].any?

    {
      "index" => choice["index"],
      "message" => message,
      "finish_reason" => choice["finish_reason"]
    }
  end

  aggregated
end