Module: Braintrust::Contrib::RubyOpenAI::Instrumentation::Common
- Defined in:
- lib/braintrust/contrib/ruby_openai/instrumentation/common.rb
Overview
Aggregation utilities for ruby-openai gem instrumentation. These are specific to the ruby-openai gem’s data structures (string keys, plain hashes).
Class Method Summary collapse
-
.aggregate_responses_chunks(chunks) ⇒ Hash
Aggregate responses streaming chunks into a single response structure.
-
.aggregate_streaming_chunks(chunks) ⇒ Hash
Aggregate streaming chunks into a single response structure.
Class Method Details
.aggregate_responses_chunks(chunks) ⇒ Hash
Aggregate responses streaming chunks into a single response structure. Specific to ruby-openai gem which uses string keys and plain hashes.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/braintrust/contrib/ruby_openai/instrumentation/common.rb', line 116 def self.aggregate_responses_chunks(chunks) return {} if chunks.empty? # Find the response.completed event which has the final response completed_chunk = chunks.find { |c| c["type"] == "response.completed" } if completed_chunk && completed_chunk["response"] response = completed_chunk["response"] return { "id" => response["id"], "output" => response["output"], "usage" => response["usage"] } end # Fallback if no completed event found {} end |
.aggregate_streaming_chunks(chunks) ⇒ Hash
Aggregate streaming chunks into a single response structure. Specific to ruby-openai gem which uses string keys and plain hashes.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/braintrust/contrib/ruby_openai/instrumentation/common.rb', line 14 def self.aggregate_streaming_chunks(chunks) return {} if chunks.empty? # Initialize aggregated structure aggregated = { "id" => nil, "created" => nil, "model" => nil, "system_fingerprint" => nil, "usage" => nil, "choices" => [] } # Track aggregated content and tool_calls for each choice index choice_data = {} chunks.each do |chunk| # Capture top-level fields from any chunk that has them aggregated["id"] ||= chunk["id"] aggregated["created"] ||= chunk["created"] aggregated["model"] ||= chunk["model"] aggregated["system_fingerprint"] ||= chunk["system_fingerprint"] # Aggregate usage (usually only in last chunk if stream_options.include_usage is set) aggregated["usage"] = chunk["usage"] if chunk["usage"] # Process choices choices = chunk["choices"] next unless choices.is_a?(Array) choices.each do |choice| index = choice["index"] || 0 choice_data[index] ||= { "index" => index, "role" => nil, "content" => +"", "tool_calls" => [], "finish_reason" => nil } delta = choice["delta"] || {} # Aggregate role (set once from first delta that has it) choice_data[index]["role"] ||= delta["role"] # Aggregate content choice_data[index]["content"] << delta["content"] if delta["content"] # Aggregate tool_calls tool_calls = delta["tool_calls"] if tool_calls.is_a?(Array) && tool_calls.any? tool_calls.each do |tool_call_delta| tc_id = tool_call_delta["id"] if tc_id && !tc_id.empty? # New tool call choice_data[index]["tool_calls"] << { "id" => tc_id, "type" => tool_call_delta["type"], "function" => { "name" => +(tool_call_delta.dig("function", "name") || ""), "arguments" => +(tool_call_delta.dig("function", "arguments") || "") } } elsif choice_data[index]["tool_calls"].any? # Continuation - append arguments to last tool call last_tool_call = choice_data[index]["tool_calls"].last if tool_call_delta.dig("function", "arguments") last_tool_call["function"]["arguments"] << tool_call_delta["function"]["arguments"] end end end end # Capture finish_reason choice_data[index]["finish_reason"] = choice["finish_reason"] if choice["finish_reason"] end end # Build final choices array aggregated["choices"] = choice_data.values.sort_by { |c| c["index"] }.map do |choice| = { "role" => choice["role"], "content" => choice["content"].empty? ? nil : choice["content"] } # Add tool_calls to message if any ["tool_calls"] = choice["tool_calls"] if choice["tool_calls"].any? { "index" => choice["index"], "message" => , "finish_reason" => choice["finish_reason"] } end aggregated end |