Module: Braintrust::Contrib::OpenAI::Instrumentation::Common
- Defined in:
- lib/braintrust/contrib/openai/instrumentation/common.rb
Overview
Aggregation utilities for official OpenAI SDK instrumentation. These are specific to the official openai gem’s data structures (symbol keys, SDK objects).
Class Method Summary collapse
-
.aggregate_responses_events(events) ⇒ Hash
Aggregate responses streaming events into a single response structure.
-
.aggregate_streaming_chunks(chunks) ⇒ Hash
Aggregate streaming chunks into a single response structure.
Class Method Details
.aggregate_responses_events(events) ⇒ Hash
Aggregate responses streaming events into a single response structure. Specific to official OpenAI SDK which returns typed event objects.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/braintrust/contrib/openai/instrumentation/common.rb', line 112 def self.aggregate_responses_events(events) return {} if events.empty? # Find the response.completed event which has the final response completed_event = events.find { |e| e.respond_to?(:type) && e.type == :"response.completed" } if completed_event&.respond_to?(:response) response = completed_event.response return { id: response.respond_to?(:id) ? response.id : nil, output: response.respond_to?(:output) ? response.output : nil, usage: response.respond_to?(:usage) ? response.usage : nil } end # Fallback if no completed event found {} end |
.aggregate_streaming_chunks(chunks) ⇒ Hash
Aggregate streaming chunks into a single response structure. Specific to official OpenAI SDK which uses symbol keys and SDK objects.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/braintrust/contrib/openai/instrumentation/common.rb', line 14 def self.aggregate_streaming_chunks(chunks) return {} if chunks.empty? # Initialize aggregated structure aggregated = { id: nil, created: nil, model: nil, system_fingerprint: nil, choices: [], usage: nil } # Track aggregated content and tool_calls for each choice index choice_data = {} chunks.each do |chunk| # Capture top-level fields from any chunk that has them aggregated[:id] ||= chunk[:id] aggregated[:created] ||= chunk[:created] aggregated[:model] ||= chunk[:model] aggregated[:system_fingerprint] ||= chunk[:system_fingerprint] # Aggregate usage (usually only in last chunk if stream_options.include_usage is set) aggregated[:usage] = chunk[:usage] if chunk[:usage] # Process choices next unless chunk[:choices].is_a?(Array) chunk[:choices].each do |choice| index = choice[:index] || 0 choice_data[index] ||= { index: index, role: nil, content: +"", tool_calls: [], finish_reason: nil } delta = choice[:delta] || {} # Aggregate role (set once from first delta that has it) choice_data[index][:role] ||= delta[:role] # Aggregate content choice_data[index][:content] << delta[:content] if delta[:content] # Aggregate tool_calls if delta[:tool_calls].is_a?(Array) && delta[:tool_calls].any? delta[:tool_calls].each do |tool_call_delta| if tool_call_delta[:id] && !tool_call_delta[:id].empty? # New tool call (dup strings to avoid mutating input) choice_data[index][:tool_calls] << { id: tool_call_delta[:id], type: tool_call_delta[:type], function: { name: +(tool_call_delta.dig(:function, :name) || ""), arguments: +(tool_call_delta.dig(:function, :arguments) || "") } } elsif choice_data[index][:tool_calls].any? # Continuation - append arguments to last tool call last_tool_call = choice_data[index][:tool_calls].last if tool_call_delta.dig(:function, :arguments) last_tool_call[:function][:arguments] << tool_call_delta[:function][:arguments] end end end end # Capture finish_reason choice_data[index][:finish_reason] = choice[:finish_reason] if choice[:finish_reason] end end # Build final choices array aggregated[:choices] = choice_data.values.sort_by { |c| c[:index] }.map do |choice| = { role: choice[:role], content: choice[:content].empty? ? nil : choice[:content] } # Add tool_calls to message if any [:tool_calls] = choice[:tool_calls] if choice[:tool_calls].any? { index: choice[:index], message: , finish_reason: choice[:finish_reason] } end aggregated end |