Module: Anthropic::Helpers::Bedrock::EventStream Private

Defined in:
lib/anthropic/helpers/bedrock/event_stream.rb

Overview

This module is part of a private API. You should avoid using this module if possible, as it may be removed or be changed in the future.

Bedrock’s ‘invoke-with-response-stream` returns `application/vnd.amazon.eventstream` (AWS binary event-stream framing), not SSE. The SDK’s stream consumer parses SSE only — without this transcoder a Bedrock stream silently yields zero events. Each frame’s payload is ‘“<base64>”` wrapping a standard Anthropic event JSON; this re-emits those as `event:`/`data:` SSE bytes so the existing `Internal::Util.decode_sse` and the streaming helpers work unchanged.

Constant Summary collapse

AWS_CONTENT_TYPE =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

%r{^application/vnd\.amazon\.eventstream}

Class Method Summary collapse

Class Method Details

.drain(decoder, chunk, y) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Feeds one chunk (or ‘nil` to flush) into the decoder and emits any complete frames as SSE bytes.

Parameters:

  • decoder (Aws::EventStream::Decoder)
  • chunk (String, nil)
  • y (Enumerator::Yielder)


48
49
50
51
52
53
54
55
56
# File 'lib/anthropic/helpers/bedrock/event_stream.rb', line 48

def drain(decoder, chunk, y)
  loop do
    msg, eof = decoder.decode_chunk(chunk)
    chunk = nil
    break if msg.nil?
    emit(msg, y)
    break if eof
  end
end

.emit(msg, y) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Emit one decoded AWS event-stream message as SSE bytes.

‘:message-type: event` frames carry a JSON payload `“<base64>”` wrapping the Anthropic event; emit it as an `event:`/`data:` pair. `:message-type: exception` frames carry an error payload and a `:exception-type` header — re-emit as the same `event: error` / `data: “type”:“error”,…` SSE shape the API would have sent, so the stream consumer’s existing error path fires.

Parameters:

  • msg (Aws::EventStream::Message)
  • y (Enumerator::Yielder)


72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/anthropic/helpers/bedrock/event_stream.rb', line 72

def emit(msg, y)
  case msg.headers[":message-type"]&.value
  in "event"
    payload = JSON.parse(msg.payload.read, symbolize_names: true)
    inner = Base64.decode64(payload.fetch(:bytes))
    type = JSON.parse(inner, symbolize_names: true).fetch(:type)
    y << "event: #{type}\ndata: #{inner}\n\n"
  in "exception"
    exc_type = msg.headers[":exception-type"]&.value
    body = msg.payload.read
    data = JSON.generate(type: "error", error: {type: exc_type, message: body})
    y << "event: error\ndata: #{data}\n\n"
  else
    # Unknown message-type — drop. AWS may add prelude/metadata frames.
  end
end

.to_sse(chunks) ⇒ Enumerable<String>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Transcodes the raw AWS event-stream byte chunks into SSE-formatted byte chunks. Incremental: each input chunk is fed to ‘Aws::EventStream::Decoder` and any complete frames are emitted immediately, so the stream is not buffered end-to-end.

Parameters:

  • chunks (Enumerable<String>)

    raw response body chunks

Returns:

  • (Enumerable<String>)

    SSE-formatted body chunks



28
29
30
31
32
33
34
35
36
37
# File 'lib/anthropic/helpers/bedrock/event_stream.rb', line 28

def to_sse(chunks)
  # `aws-eventstream` ships with `aws-sdk-core`, which the Bedrock
  # client already lazy-requires before any request can fire.
  require("aws-eventstream")
  decoder = Aws::EventStream::Decoder.new
  Anthropic::Internal::Util.chain_fused(chunks) do |y|
    chunks.each { |chunk| drain(decoder, chunk, y) }
    drain(decoder, nil, y)
  end
end