Class: PlatformSdk::Observability::Langfuse::BedrockClaudeAdapter::StreamCollector
- Inherits:
-
Object
- Object
- PlatformSdk::Observability::Langfuse::BedrockClaudeAdapter::StreamCollector
- Defined in:
- lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb
Overview
Accumulates streaming-response state so the adapter can emit a single observation after the stream ends.
Bedrock’s invoke_model_with_response_stream yields events whose ‘bytes` is JSON. For Anthropic on Bedrock the relevant event types are:
-
‘content_block_delta` — incremental text in `delta.text`
-
‘message_delta` — top-level `usage.output_tokens` (final count)
-
‘message_stop` — Bedrock adds `amazon-bedrock-invocationMetrics` here with `inputTokenCount` and `outputTokenCount`
Both message_delta and message_stop are emitted on success; we prefer message_stop’s metrics when both are present.
Instance Attribute Summary collapse
-
#text ⇒ Object
readonly
Returns the value of attribute text.
Instance Method Summary collapse
-
#initialize ⇒ StreamCollector
constructor
A new instance of StreamCollector.
- #observe(event) ⇒ Object
- #usage ⇒ Object
Constructor Details
#initialize ⇒ StreamCollector
Returns a new instance of StreamCollector.
189 190 191 192 193 |
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 189 def initialize @text = +'' @input_tokens = nil @output_tokens = nil end |
Instance Attribute Details
#text ⇒ Object (readonly)
Returns the value of attribute text.
187 188 189 |
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 187 def text @text end |
Instance Method Details
#observe(event) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 199 def observe(event) parsed = parse_event(event) return if parsed.nil? case parsed['type'] when 'message_start' # Anthropic emits initial input_tokens here. Used as a # fallback when Bedrock's message_stop invocationMetrics # are absent (future models, guardrail truncations, etc.). start_usage = parsed.dig('message', 'usage') || {} @input_tokens ||= start_usage['input_tokens'] when 'content_block_delta' delta_text = parsed.dig('delta', 'text') @text << delta_text if delta_text when 'message_delta' # message_delta usage is cumulative across the (possibly many) # message_delta events Anthropic emits, so the latest value # wins. See https://docs.anthropic.com/en/api/messages-streaming usage = parsed['usage'] || {} @output_tokens = usage['output_tokens'] if usage['output_tokens'] when 'message_stop' metrics = parsed['amazon-bedrock-invocationMetrics'] || {} @input_tokens = metrics['inputTokenCount'] if metrics['inputTokenCount'] @output_tokens = metrics['outputTokenCount'] if metrics['outputTokenCount'] end rescue StandardError => e OpenTelemetry.handle_error( message: "BedrockClaudeAdapter::StreamCollector#observe failed: #{e.class}: #{e.[0, 200]}" ) end |
#usage ⇒ Object
195 196 197 |
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 195 def usage { input_tokens: @input_tokens, output_tokens: @output_tokens } end |