Module: PlatformSdk::Observability::Langfuse::BedrockClaudeAdapter
- Defined in:
- lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb
Overview
Fires ‘llm_call.platform_sdk` notifications for Anthropic Claude calls made directly through the AWS Bedrock SDK (rather than via RubyLLM). Two entry points cover the two Bedrock invocation modes:
# Non-streaming (`bedrock_client.invoke_model(payload)`)
PlatformSdk::Observability::Langfuse::BedrockClaudeAdapter.with_observability(
payload: { model_id: 'us.anthropic.claude-sonnet-4-6',
body: JSON.dump(anthropic_messages_payload) },
context: 'generate_outline'
) { bedrock_client.invoke_model(payload) }
# Streaming (`bedrock_client.invoke_model_with_response_stream`)
PlatformSdk::Observability::Langfuse::BedrockClaudeAdapter.with_streaming_observability(
payload: { model_id: ..., body: JSON.dump(...) },
context: 'chat_stream'
) do |collector|
bedrock_client.invoke_model_with_response_stream(
payload.merge(event_stream_handler: proc do |stream|
stream.on_chunk_event do |event|
collector.observe(event)
# ...the caller's own chunk handling
end
end)
)
end
The streaming collector accumulates text from ‘content_block_delta` events and pulls the final `input_tokens`/`output_tokens` from the `message_stop` event’s ‘amazon-bedrock-invocationMetrics`. The notification fires once after the block returns.
No hard dependency on the AWS SDK — this adapter only reads the documented JSON shape of Anthropic’s Messages API on Bedrock.
Defined Under Namespace
Classes: StreamCollector
Class Method Summary collapse
-
.fire(payload:, response:, context:, error: nil) ⇒ Object
After-the-fact emission for non-streaming calls.
- .with_observability(payload:, context:) ⇒ Object
- .with_streaming_observability(payload:, context:) ⇒ Object
Class Method Details
.fire(payload:, response:, context:, error: nil) ⇒ Object
After-the-fact emission for non-streaming calls. Caller can use this if they want to invoke Bedrock outside of the block helper and emit observability separately.
66 67 68 69 70 71 72 73 74 |
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 66 def fire(payload:, response:, context:, error: nil) attrs = decode_response(response) notify(payload:, output: attrs[:output], usage: attrs[:usage], context:, error:) rescue StandardError => e OpenTelemetry.handle_error( message: "BedrockClaudeAdapter.fire failed: #{e.class}: #{e.[0, 200]}" ) nil end |
.with_observability(payload:, context:) ⇒ Object
44 45 46 47 48 49 50 51 |
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 44 def with_observability(payload:, context:) response = yield fire(payload:, response:, context:) response rescue StandardError => e fire(payload:, response: nil, context:, error: e) raise end |
.with_streaming_observability(payload:, context:) ⇒ Object
53 54 55 56 57 58 59 60 61 |
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 53 def with_streaming_observability(payload:, context:) collector = StreamCollector.new result = yield collector fire_from_collector(payload:, collector:, context:) result rescue StandardError => e fire_from_collector(payload:, collector:, context:, error: e) raise end |