Class: Tracekit::LLM::AnthropicInstrumentation::AnthropicStreamAccumulator

Inherits:
Object
  • Object
show all
Defined in:
lib/tracekit/llm/anthropic_instrumentation.rb

Overview

Accumulates streaming event data for span attributes

Instance Method Summary collapse

Constructor Details

#initialize(span, capture_content) ⇒ AnthropicStreamAccumulator

Returns a new instance of AnthropicStreamAccumulator.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/tracekit/llm/anthropic_instrumentation.rb', line 127

def initialize(span, capture_content)
  @span = span
  @capture = capture_content
  @model = nil
  @id = nil
  @stop_reason = nil
  @input_tokens = nil
  @output_tokens = nil
  @cache_creation_tokens = nil
  @cache_read_tokens = nil
  @output_chunks = []
  @tool_calls = {}
  @current_block_index = 0
end

Instance Method Details

#finalizeObject



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/tracekit/llm/anthropic_instrumentation.rb', line 190

def finalize
  Common.set_response_attributes(@span,
    model: @model,
    id: @id,
    finish_reasons: @stop_reason ? [@stop_reason] : nil,
    input_tokens: @input_tokens,
    output_tokens: @output_tokens
  )

  @span.set_attribute("gen_ai.usage.cache_creation.input_tokens", @cache_creation_tokens) if @cache_creation_tokens
  @span.set_attribute("gen_ai.usage.cache_read.input_tokens", @cache_read_tokens) if @cache_read_tokens

  @tool_calls.each_value do |tc|
    Common.record_tool_call(@span, **tc)
  end

  if @capture && @output_chunks.any?
    full_content = @output_chunks.join
    Common.capture_output_messages(@span, [{ "type" => "text", "text" => full_content }])
  end
rescue => _e
  # Never break user code
ensure
  @span.finish
end

#process_event(event) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/tracekit/llm/anthropic_instrumentation.rb', line 142

def process_event(event)
  event_type = event["type"] || event[:type]

  case event_type
  when "message_start"
    message = event["message"] || event[:message] || {}
    @model = message["model"] || message[:model]
    @id = message["id"] || message[:id]
    usage = message["usage"] || message[:usage] || {}
    @input_tokens = usage["input_tokens"] || usage[:input_tokens]
    @cache_creation_tokens = usage["cache_creation_input_tokens"] || usage[:cache_creation_input_tokens]
    @cache_read_tokens = usage["cache_read_input_tokens"] || usage[:cache_read_input_tokens]

  when "content_block_start"
    @current_block_index = event["index"] || event[:index] || @current_block_index
    cb = event["content_block"] || event[:content_block] || {}
    if (cb["type"] || cb[:type]) == "tool_use"
      @tool_calls[@current_block_index] = {
        name: cb["name"] || cb[:name] || "unknown",
        id: cb["id"] || cb[:id],
        arguments: ""
      }
    end

  when "content_block_delta"
    delta = event["delta"] || event[:delta] || {}
    delta_type = delta["type"] || delta[:type]
    if delta_type == "text_delta" && @capture
      text = delta["text"] || delta[:text]
      @output_chunks << text if text
    elsif delta_type == "input_json_delta"
      partial = delta["partial_json"] || delta[:partial_json]
      idx = event["index"] || event[:index] || @current_block_index
      if partial && @tool_calls[idx]
        @tool_calls[idx][:arguments] += partial
      end
    end

  when "message_delta"
    delta = event["delta"] || event[:delta] || {}
    @stop_reason = delta["stop_reason"] || delta[:stop_reason] if delta["stop_reason"] || delta[:stop_reason]
    usage = event["usage"] || event[:usage] || {}
    @output_tokens = usage["output_tokens"] || usage[:output_tokens] if usage["output_tokens"] || usage[:output_tokens]
  end
rescue => _e
  # Never fail on event processing
end