Module: PlatformSdk::Observability::Langfuse::BedrockClaudeAdapter

Defined in:
lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb

Overview

Fires ‘llm_call.platform_sdk` notifications for Anthropic Claude calls made directly through the AWS Bedrock SDK (rather than via RubyLLM). Two entry points cover the two Bedrock invocation modes:

# Non-streaming (`bedrock_client.invoke_model(payload)`)
PlatformSdk::Observability::Langfuse::BedrockClaudeAdapter.with_observability(
  payload: { model_id: 'us.anthropic.claude-sonnet-4-6',
             body: JSON.dump(anthropic_messages_payload) },
  context: 'generate_outline'
) { bedrock_client.invoke_model(payload) }

# Streaming (`bedrock_client.invoke_model_with_response_stream`)
PlatformSdk::Observability::Langfuse::BedrockClaudeAdapter.with_streaming_observability(
  payload: { model_id: ..., body: JSON.dump(...) },
  context: 'chat_stream'
) do |collector|
  bedrock_client.invoke_model_with_response_stream(
    payload.merge(event_stream_handler: proc do |stream|
      stream.on_chunk_event do |event|
        collector.observe(event)
        # ...the caller's own chunk handling
      end
    end)
  )
end

The streaming collector accumulates text from ‘content_block_delta` events and pulls the final `input_tokens`/`output_tokens` from the `message_stop` event’s ‘amazon-bedrock-invocationMetrics`. The notification fires once after the block returns.

No hard dependency on the AWS SDK — this adapter only reads the documented JSON shape of Anthropic’s Messages API on Bedrock.

Defined Under Namespace

Classes: StreamCollector

Class Method Summary collapse

Class Method Details

.fire(payload:, response:, context:, error: nil) ⇒ Object

After-the-fact emission for non-streaming calls. Caller can use this if they want to invoke Bedrock outside of the block helper and emit observability separately.



66
67
68
69
70
71
72
73
74
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 66

def fire(payload:, response:, context:, error: nil)
  attrs = decode_response(response)
  notify(payload:, output: attrs[:output], usage: attrs[:usage], context:, error:)
rescue StandardError => e
  OpenTelemetry.handle_error(
    message: "BedrockClaudeAdapter.fire failed: #{e.class}: #{e.message[0, 200]}"
  )
  nil
end

.with_observability(payload:, context:) ⇒ Object



44
45
46
47
48
49
50
51
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 44

def with_observability(payload:, context:)
  response = yield
  fire(payload:, response:, context:)
  response
rescue StandardError => e
  fire(payload:, response: nil, context:, error: e)
  raise
end

.with_streaming_observability(payload:, context:) ⇒ Object



53
54
55
56
57
58
59
60
61
# File 'lib/platform_sdk/observability/langfuse/bedrock_claude_adapter.rb', line 53

def with_streaming_observability(payload:, context:)
  collector = StreamCollector.new
  result = yield collector
  fire_from_collector(payload:, collector:, context:)
  result
rescue StandardError => e
  fire_from_collector(payload:, collector:, context:, error: e)
  raise
end