Class: PlatformSdk::Observability::Langfuse::BedrockClaudeAdapter::StreamCollector

Inherits:
Object
  • Object
show all
Defined in:
lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb

Overview

Accumulates streaming-response state so the adapter can emit a single observation after the stream ends.

Bedrock’s invoke_model_with_response_stream yields events whose ‘bytes` is JSON. For Anthropic on Bedrock the relevant event types are:

  • ‘content_block_delta` — incremental text in `delta.text`

  • ‘message_delta` — top-level `usage.output_tokens` (final count)

  • ‘message_stop` — Bedrock adds `amazon-bedrock-invocationMetrics` here with `inputTokenCount` and `outputTokenCount`

Both message_delta and message_stop are emitted on success; we prefer message_stop’s metrics when both are present.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStreamCollector

Returns a new instance of StreamCollector.



189
190
191
192
193
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 189

def initialize
  @text = +''
  @input_tokens = nil
  @output_tokens = nil
end

Instance Attribute Details

#textObject (readonly)

Returns the value of attribute text.



187
188
189
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 187

def text
  @text
end

Instance Method Details

#observe(event) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 199

def observe(event)
  parsed = parse_event(event)
  return if parsed.nil?

  case parsed['type']
  when 'message_start'
    # Anthropic emits initial input_tokens here. Used as a
    # fallback when Bedrock's message_stop invocationMetrics
    # are absent (future models, guardrail truncations, etc.).
    start_usage = parsed.dig('message', 'usage') || {}
    @input_tokens ||= start_usage['input_tokens']
  when 'content_block_delta'
    delta_text = parsed.dig('delta', 'text')
    @text << delta_text if delta_text
  when 'message_delta'
    # message_delta usage is cumulative across the (possibly many)
    # message_delta events Anthropic emits, so the latest value
    # wins. See https://docs.anthropic.com/en/api/messages-streaming
    usage = parsed['usage'] || {}
    @output_tokens = usage['output_tokens'] if usage['output_tokens']
  when 'message_stop'
    metrics = parsed['amazon-bedrock-invocationMetrics'] || {}
    @input_tokens = metrics['inputTokenCount'] if metrics['inputTokenCount']
    @output_tokens = metrics['outputTokenCount'] if metrics['outputTokenCount']
  end
rescue StandardError => e
  OpenTelemetry.handle_error(
    message: "BedrockClaudeAdapter::StreamCollector#observe failed: #{e.class}: #{e.message[0, 200]}"
  )
end

#usageObject



195
196
197
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 195

def usage
  { input_tokens: @input_tokens, output_tokens: @output_tokens }
end