Module: Anthropic::Helpers::Bedrock::EventStream Private
- Defined in:
- lib/anthropic/helpers/bedrock/event_stream.rb
Overview
This module is part of a private API. You should avoid using this module if possible, as it may be removed or be changed in the future.
Bedrock’s ‘invoke-with-response-stream` returns `application/vnd.amazon.eventstream` (AWS binary event-stream framing), not SSE. The SDK’s stream consumer parses SSE only — without this transcoder a Bedrock stream silently yields zero events. Each frame’s payload is ‘“<base64>”` wrapping a standard Anthropic event JSON; this re-emits those as `event:`/`data:` SSE bytes so the existing `Internal::Util.decode_sse` and the streaming helpers work unchanged.
Constant Summary collapse
- AWS_CONTENT_TYPE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
%r{^application/vnd\.amazon\.eventstream}
Class Method Summary collapse
-
.drain(decoder, chunk, y) ⇒ void
private
Feeds one chunk (or ‘nil` to flush) into the decoder and emits any complete frames as SSE bytes.
-
.emit(msg, y) ⇒ void
private
Emit one decoded AWS event-stream message as SSE bytes.
-
.to_sse(chunks) ⇒ Enumerable<String>
private
Transcodes the raw AWS event-stream byte chunks into SSE-formatted byte chunks.
Class Method Details
.drain(decoder, chunk, y) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Feeds one chunk (or ‘nil` to flush) into the decoder and emits any complete frames as SSE bytes.
48 49 50 51 52 53 54 55 56 |
# File 'lib/anthropic/helpers/bedrock/event_stream.rb', line 48 def drain(decoder, chunk, y) loop do msg, eof = decoder.decode_chunk(chunk) chunk = nil break if msg.nil? emit(msg, y) break if eof end end |
.emit(msg, y) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Emit one decoded AWS event-stream message as SSE bytes.
‘:message-type: event` frames carry a JSON payload `“<base64>”` wrapping the Anthropic event; emit it as an `event:`/`data:` pair. `:message-type: exception` frames carry an error payload and a `:exception-type` header — re-emit as the same `event: error` / `data: “type”:“error”,…` SSE shape the API would have sent, so the stream consumer’s existing error path fires.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/anthropic/helpers/bedrock/event_stream.rb', line 72 def emit(msg, y) case msg.headers[":message-type"]&.value in "event" payload = JSON.parse(msg.payload.read, symbolize_names: true) inner = Base64.decode64(payload.fetch(:bytes)) type = JSON.parse(inner, symbolize_names: true).fetch(:type) y << "event: #{type}\ndata: #{inner}\n\n" in "exception" exc_type = msg.headers[":exception-type"]&.value body = msg.payload.read data = JSON.generate(type: "error", error: {type: exc_type, message: body}) y << "event: error\ndata: #{data}\n\n" else # Unknown message-type — drop. AWS may add prelude/metadata frames. end end |
.to_sse(chunks) ⇒ Enumerable<String>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Transcodes the raw AWS event-stream byte chunks into SSE-formatted byte chunks. Incremental: each input chunk is fed to ‘Aws::EventStream::Decoder` and any complete frames are emitted immediately, so the stream is not buffered end-to-end.
28 29 30 31 32 33 34 35 36 37 |
# File 'lib/anthropic/helpers/bedrock/event_stream.rb', line 28 def to_sse(chunks) # `aws-eventstream` ships with `aws-sdk-core`, which the Bedrock # client already lazy-requires before any request can fire. require("aws-eventstream") decoder = Aws::EventStream::Decoder.new Anthropic::Internal::Util.chain_fused(chunks) do |y| chunks.each { |chunk| drain(decoder, chunk, y) } drain(decoder, nil, y) end end |