Class: Anthropic::Helpers::Streaming::MessageStream Private
- Inherits:
-
Object
- Object
- Anthropic::Helpers::Streaming::MessageStream
- Includes:
- Internal::Type::BaseStream
- Defined in:
- lib/anthropic/helpers/streaming/message_stream.rb,
sig/anthropic/helpers/streaming/message_stream.rbs
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
MessageStream provides a Ruby Enumerable interface over Server-Sent Events from the Anthropic API, yielding a mix of raw streaming events and higher-level typed events while maintaining accumulated message state throughout the stream lifecycle.
Instance Attribute Summary
Attributes included from Internal::Type::BaseStream
Instance Method Summary collapse
-
#accumulate_event(event:, current_snapshot:) ⇒ Anthropic::Models::Message
private
Builds up a complete Message object as streaming events arrive.
-
#accumulated_message ⇒ Anthropic::Models::Message, Anthropic::Models::Beta::BetaMessage
Returns the complete accumulated Message object after stream completion.
-
#accumulated_text ⇒ String
Returns all text content blocks concatenated into a single string.
-
#build_events(event:, message_snapshot:) ⇒ Array<Object>
private
Determines which events to yield for a given raw streaming event.
-
#initialize(raw_stream:, tools: {}, models: {}) ⇒ MessageStream
constructor
private
A new instance of MessageStream.
-
#iterator ⇒ Enumerable<generic<Elem>>
private
Consumes raw stream events and yields a mix of raw and higher-level typed events while maintaining accumulated message state.
-
#text ⇒ Enumerable<String>
Returns an enumerable of text deltas from the streaming response.
-
#until_done ⇒ void
Blocks until the stream has been consumed.
Methods included from Internal::Type::BaseStream
#close, #each, #inspect, #to_enum
Constructor Details
#initialize(raw_stream:, tools: {}, models: {}) ⇒ MessageStream
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of MessageStream.
332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 332 def initialize(raw_stream:, tools: {}, models: {}) # The underlying Server-Sent Event stream from the Anthropic API. @raw_stream = raw_stream # Accumulated message state that builds up as events are processed. @accumated_message_snapshot = nil # Mapping of tool names to model classes for parsing. @tools = tools @models = models # Lazy enumerable that transforms raw events into consumable events. @iterator = iterator @status = raw_stream.status @headers = raw_stream.headers @model = raw_stream.instance_variable_get(:@model) end |
Instance Method Details
#accumulate_event(event:, current_snapshot:) ⇒ Anthropic::Models::Message
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Builds up a complete Message object as streaming events arrive.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 90 private def accumulate_event(event:, current_snapshot:) case event in Anthropic::Models::RawMessageStreamEvent | Anthropic::Models::BetaRawMessageStreamEvent nil else = "Expected event to be a variant of RawMessageStreamEvent, got #{event.class}" raise ArgumentError.new() end if current_snapshot.nil? return event. if event.type == :message_start = "Unexpected event order, got \"#{event.type}\" before \":message_start\"" raise RuntimeError.new() end case event in Anthropic::Models::RawMessageStartEvent # Use the converter to create a new, isolated copy of the message object. # This ensures proper type validation and prevents shared object references # that could lead to unintended mutations during streaming accumulation. # Matches the Python SDK's approach of explicitly constructing Message objects. return Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::Message, event.) in Anthropic::Models::BetaRawMessageStartEvent return Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::BetaMessage, event.) in Anthropic::Models::RawContentBlockStartEvent | Anthropic::Models::BetaRawContentBlockStartEvent current_snapshot.content = (current_snapshot.content || []) + [event.content_block] # The final hop's fallback block names the model that served the response — # keeps the snapshot consistent with the relabeled non-streaming message. current_snapshot.model = event.content_block.to.model if event.content_block.type == :fallback in Anthropic::Models::RawContentBlockDeltaEvent | Anthropic::Models::BetaRawContentBlockDeltaEvent content = current_snapshot.content[event.index] case (delta = event.delta) in Anthropic::Models::TextDelta | Anthropic::Models::BetaTextDelta if content.type == :text content.text += delta.text in Anthropic::Models::InputJSONDelta | Anthropic::Models::BetaInputJSONDelta if content.type == :tool_use json_buf = content._json_buf.to_s json_buf += delta.partial_json content.input = json_buf content._json_buf = json_buf in Anthropic::Models::CitationsDelta | Anthropic::Models::BetaCitationsDelta if content.type == :text content.citations ||= [] content.citations << delta.citation in Anthropic::Models::ThinkingDelta | Anthropic::Models::BetaThinkingDelta if content.type == :thinking content.thinking += delta.thinking in Anthropic::Models::SignatureDelta | Anthropic::Models::BetaSignatureDelta if content.type == :thinking content.signature = delta.signature in Anthropic::Models::BetaCompactionContentBlockDelta if content.type == :compaction content.content = delta.content content.encrypted_content = delta.encrypted_content else end in Anthropic::Models::RawMessageDeltaEvent | Anthropic::Models::BetaRawMessageDeltaEvent current_snapshot.stop_reason = event.delta.stop_reason current_snapshot.stop_sequence = event.delta.stop_sequence current_snapshot.stop_details = event.delta.stop_details unless event.delta.stop_details.nil? current_snapshot.usage.output_tokens = event.usage.output_tokens # The message_delta usage is authoritative for the final counts; carry every # field the event reports (only when present, so we never clobber the # message_start values with nils). unless event.usage.input_tokens.nil? current_snapshot.usage.input_tokens = event.usage.input_tokens end unless event.usage.cache_creation_input_tokens.nil? current_snapshot.usage.cache_creation_input_tokens = event.usage.cache_creation_input_tokens end unless event.usage.cache_read_input_tokens.nil? current_snapshot.usage.cache_read_input_tokens = event.usage.cache_read_input_tokens end unless event.usage.server_tool_use.nil? current_snapshot.usage.server_tool_use = event.usage.server_tool_use end if event.is_a?(Anthropic::Models::BetaRawMessageDeltaEvent) && !event.usage.iterations.nil? current_snapshot.usage.iterations = event.usage.iterations end else end current_snapshot end |
#accumulated_message ⇒ (Anthropic::Models::message] #accumulated_message ⇒ void
Returns the complete accumulated Message object after stream completion.
64 65 66 67 68 |
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 64 def until_done parse_content_blocks!(@accumated_message_snapshot) @accumated_message_snapshot end |
#accumulated_text ⇒ String
Returns all text content blocks concatenated into a single string. NOTE: Currently the API will only respond with a single content block.
Will raise an error if no text content blocks were returned.
77 78 79 80 |
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 77 def accumulated_text = .content.map { _1.type == :text ? _1.text : nil }.join end |
#build_events(event:, message_snapshot:) ⇒ Array<Object>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Determines which events to yield for a given raw streaming event.
May transform events into higher-level types (TextEvent, InputJsonEvent), pass through raw events unchanged, or produce multiple events.
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 186 private def build_events(event:, message_snapshot:) events_to_yield = [] case event in Anthropic::Models::RawMessageStopEvent | Anthropic::Models::BetaRawMessageStopEvent events_to_yield << MessageStopEvent.new( type: :message_stop, message: ) in Anthropic::Models::RawContentBlockDeltaEvent | Anthropic::Models::BetaRawContentBlockDeltaEvent events_to_yield << event content_block = .content[event.index] case (delta = event.delta) in Anthropic::Models::TextDelta | Anthropic::Models::BetaTextDelta if content_block.type == :text events_to_yield << Anthropic::Streaming::TextEvent.new( type: :text, text: delta.text, snapshot: content_block.text ) in Anthropic::Models::InputJSONDelta | Anthropic::Models::BetaInputJSONDelta if content_block.type == :tool_use events_to_yield << Anthropic::Streaming::InputJsonEvent.new( type: :input_json, partial_json: delta.partial_json, snapshot: content_block.input ) in Anthropic::Models::CitationsDelta | Anthropic::Models::BetaCitationsDelta if content_block.type == :text events_to_yield << Anthropic::Streaming::CitationEvent.new( type: :citation, citation: delta.citation, snapshot: content_block.citations || [] ) in Anthropic::Models::ThinkingDelta | Anthropic::Models::BetaThinkingDelta if content_block.type == :thinking events_to_yield << Anthropic::Streaming::ThinkingEvent.new( type: :thinking, thinking: delta.thinking, snapshot: content_block.thinking ) in Anthropic::Models::SignatureDelta | Anthropic::Models::BetaSignatureDelta if content_block.type == :thinking events_to_yield << Anthropic::Streaming::SignatureEvent.new( type: :signature, signature: content_block.signature ) in Anthropic::Models::BetaCompactionContentBlockDelta if content_block.type == :compaction events_to_yield << Anthropic::Streaming::CompactionEvent.new( type: :compaction, content: content_block.content ) else end in Anthropic::Models::RawContentBlockStopEvent | Anthropic::Models::BetaRawContentBlockStopEvent content_block = .content[event.index] events_to_yield << Anthropic::Streaming::ContentBlockStopEvent.new( type: :content_block_stop, index: event.index, content_block: content_block ) else events_to_yield << event end events_to_yield end |
#iterator ⇒ Enumerable<generic<Elem>>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Consumes raw stream events and yields a mix of raw and higher-level typed events while
maintaining accumulated message state. This is what's called when you run each on the
stream.
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 24 private def iterator @iterator ||= Anthropic::Internal::Util.chain_fused(@stream) do |y| @raw_stream.each do |raw_event| @accumated_message_snapshot = accumulate_event( event: raw_event, current_snapshot: @accumated_message_snapshot ) events_to_yield = build_events(event: raw_event, message_snapshot: @accumated_message_snapshot) events_to_yield.each(&y) end end end |
#text ⇒ Enumerable<String>
Returns an enumerable of text deltas from the streaming response.
49 50 51 52 53 54 55 56 57 |
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 49 def text Anthropic::Internal::Util.chain_fused(@iterator) do |y| @iterator.each do |event| if event.type == :content_block_delta && event.delta.type == :text_delta y << event.delta.text end end end end |
#until_done ⇒ void
This method returns an undefined value.
Blocks until the stream has been consumed
42 |
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 42 def until_done = each {} # rubocop:disable Lint/EmptyBlock |