Class: Anthropic::Helpers::Streaming::MessageStream Private

Inherits:
Object
  • Object
show all
Includes:
Internal::Type::BaseStream
Defined in:
lib/anthropic/helpers/streaming/message_stream.rb,
sig/anthropic/helpers/streaming/message_stream.rbs

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

MessageStream provides a Ruby Enumerable interface over Server-Sent Events from the Anthropic API, yielding a mix of raw streaming events and higher-level typed events while maintaining accumulated message state throughout the stream lifecycle.

Instance Attribute Summary

Attributes included from Internal::Type::BaseStream

#headers, #status

Instance Method Summary collapse

Methods included from Internal::Type::BaseStream

#close, #each, #inspect, #to_enum

Constructor Details

#initialize(raw_stream:, tools: {}, models: {}) ⇒ MessageStream

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of MessageStream.

Parameters:

  • raw_stream (Anthropic::Internal::Type::BaseStream)
  • tools (Hash{String=>Class}) (defaults to: {})

    Mapping of tool names to model classes

  • models (Hash{String=>Class}) (defaults to: {})

    Mapping of tool names to model classes



332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 332

def initialize(raw_stream:, tools: {}, models: {})
  # The underlying Server-Sent Event stream from the Anthropic API.
  @raw_stream = raw_stream
  # Accumulated message state that builds up as events are processed.
  @accumated_message_snapshot = nil
  # Mapping of tool names to model classes for parsing.
  @tools = tools
  @models = models
  # Lazy enumerable that transforms raw events into consumable events.
  @iterator = iterator
  @status = raw_stream.status
  @headers = raw_stream.headers
  @model = raw_stream.instance_variable_get(:@model)
end

Instance Method Details

#accumulate_event(event:, current_snapshot:) ⇒ Anthropic::Models::Message

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Builds up a complete Message object as streaming events arrive.

Parameters:

Returns:



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 90

private def accumulate_event(event:, current_snapshot:)
  case event
  in Anthropic::Models::RawMessageStreamEvent | Anthropic::Models::BetaRawMessageStreamEvent
    nil
  else
    message = "Expected event to be a variant of RawMessageStreamEvent, got #{event.class}"
    raise ArgumentError.new(message)
  end

  if current_snapshot.nil?
    return event.message if event.type == :message_start

    message = "Unexpected event order, got \"#{event.type}\" before \":message_start\""
    raise RuntimeError.new(message)
  end

  case event
  in Anthropic::Models::RawMessageStartEvent # Use the converter to create a new, isolated copy of the message object.
    # This ensures proper type validation and prevents shared object references
    # that could lead to unintended mutations during streaming accumulation.
    # Matches the Python SDK's approach of explicitly constructing Message objects.
    return Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::Message, event.message)
  in Anthropic::Models::BetaRawMessageStartEvent
    return Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::BetaMessage, event.message)
  in Anthropic::Models::RawContentBlockStartEvent | Anthropic::Models::BetaRawContentBlockStartEvent
    current_snapshot.content = (current_snapshot.content || []) + [event.content_block]

    # The final hop's fallback block names the model that served the response —
    # keeps the snapshot consistent with the relabeled non-streaming message.
    current_snapshot.model = event.content_block.to.model if event.content_block.type == :fallback
  in Anthropic::Models::RawContentBlockDeltaEvent | Anthropic::Models::BetaRawContentBlockDeltaEvent
    content = current_snapshot.content[event.index]

    case (delta = event.delta)
    in Anthropic::Models::TextDelta | Anthropic::Models::BetaTextDelta if content.type == :text
      content.text += delta.text
    in Anthropic::Models::InputJSONDelta | Anthropic::Models::BetaInputJSONDelta if content.type == :tool_use
      json_buf = content._json_buf.to_s
      json_buf += delta.partial_json

      content.input = json_buf
      content._json_buf = json_buf
    in Anthropic::Models::CitationsDelta | Anthropic::Models::BetaCitationsDelta if content.type == :text
      content.citations ||= []
      content.citations << delta.citation
    in Anthropic::Models::ThinkingDelta | Anthropic::Models::BetaThinkingDelta if content.type == :thinking
      content.thinking += delta.thinking
    in Anthropic::Models::SignatureDelta | Anthropic::Models::BetaSignatureDelta if content.type == :thinking
      content.signature = delta.signature
    in Anthropic::Models::BetaCompactionContentBlockDelta if content.type == :compaction
      content.content = delta.content
      content.encrypted_content = delta.encrypted_content
    else
    end
  in Anthropic::Models::RawMessageDeltaEvent | Anthropic::Models::BetaRawMessageDeltaEvent
    current_snapshot.stop_reason = event.delta.stop_reason
    current_snapshot.stop_sequence = event.delta.stop_sequence
    current_snapshot.stop_details = event.delta.stop_details unless event.delta.stop_details.nil?
    current_snapshot.usage.output_tokens = event.usage.output_tokens

    # The message_delta usage is authoritative for the final counts; carry every
    # field the event reports (only when present, so we never clobber the
    # message_start values with nils).
    unless event.usage.input_tokens.nil?
      current_snapshot.usage.input_tokens = event.usage.input_tokens
    end
    unless event.usage.cache_creation_input_tokens.nil?
      current_snapshot.usage.cache_creation_input_tokens = event.usage.cache_creation_input_tokens
    end
    unless event.usage.cache_read_input_tokens.nil?
      current_snapshot.usage.cache_read_input_tokens = event.usage.cache_read_input_tokens
    end
    unless event.usage.server_tool_use.nil?
      current_snapshot.usage.server_tool_use = event.usage.server_tool_use
    end

    if event.is_a?(Anthropic::Models::BetaRawMessageDeltaEvent) && !event.usage.iterations.nil?
      current_snapshot.usage.iterations = event.usage.iterations
    end
  else
  end

  current_snapshot
end

#accumulated_message(Anthropic::Models::message] #accumulated_messagevoid

Returns the complete accumulated Message object after stream completion.

Overloads:

  • #accumulated_message(Anthropic::Models::message]

    Returns (Anthropic::Models::message].

    Returns:

    • ((Anthropic::Models::message])

      (Anthropic::Models::message]

  • #accumulated_messagevoid

    This method returns an undefined value.

Returns:



64
65
66
67
68
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 64

def accumulated_message
  until_done
  parse_content_blocks!(@accumated_message_snapshot)
  @accumated_message_snapshot
end

#accumulated_textString

Returns all text content blocks concatenated into a single string. NOTE: Currently the API will only respond with a single content block.

Will raise an error if no text content blocks were returned.

Returns:

  • (String)


77
78
79
80
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 77

def accumulated_text
  message = accumulated_message
  message.content.map { _1.type == :text ? _1.text : nil }.join
end

#build_events(event:, message_snapshot:) ⇒ Array<Object>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Determines which events to yield for a given raw streaming event.

May transform events into higher-level types (TextEvent, InputJsonEvent), pass through raw events unchanged, or produce multiple events.

Parameters:

Returns:

  • (Array<Object>)

    events to yield (mix of raw and typed events)



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 186

private def build_events(event:, message_snapshot:)
  events_to_yield = []

  case event
  in Anthropic::Models::RawMessageStopEvent | Anthropic::Models::BetaRawMessageStopEvent
    events_to_yield << MessageStopEvent.new(
      type: :message_stop,
      message: message_snapshot
    )
  in Anthropic::Models::RawContentBlockDeltaEvent | Anthropic::Models::BetaRawContentBlockDeltaEvent
    events_to_yield << event
    content_block = message_snapshot.content[event.index]

    case (delta = event.delta)
    in Anthropic::Models::TextDelta | Anthropic::Models::BetaTextDelta if content_block.type == :text
      events_to_yield << Anthropic::Streaming::TextEvent.new(
        type: :text,
        text: delta.text,
        snapshot: content_block.text
      )
    in Anthropic::Models::InputJSONDelta | Anthropic::Models::BetaInputJSONDelta if content_block.type == :tool_use
      events_to_yield << Anthropic::Streaming::InputJsonEvent.new(
        type: :input_json,
        partial_json: delta.partial_json,
        snapshot: content_block.input
      )
    in Anthropic::Models::CitationsDelta | Anthropic::Models::BetaCitationsDelta if content_block.type == :text
      events_to_yield << Anthropic::Streaming::CitationEvent.new(
        type: :citation,
        citation: delta.citation,
        snapshot: content_block.citations || []
      )
    in Anthropic::Models::ThinkingDelta | Anthropic::Models::BetaThinkingDelta if content_block.type == :thinking
      events_to_yield << Anthropic::Streaming::ThinkingEvent.new(
        type: :thinking,
        thinking: delta.thinking,
        snapshot: content_block.thinking
      )
    in Anthropic::Models::SignatureDelta | Anthropic::Models::BetaSignatureDelta if content_block.type == :thinking
      events_to_yield << Anthropic::Streaming::SignatureEvent.new(
        type: :signature,
        signature: content_block.signature
      )
    in Anthropic::Models::BetaCompactionContentBlockDelta if content_block.type == :compaction
      events_to_yield << Anthropic::Streaming::CompactionEvent.new(
        type: :compaction,
        content: content_block.content
      )
    else
    end
  in Anthropic::Models::RawContentBlockStopEvent | Anthropic::Models::BetaRawContentBlockStopEvent
    content_block = message_snapshot.content[event.index]

    events_to_yield << Anthropic::Streaming::ContentBlockStopEvent.new(
      type: :content_block_stop,
      index: event.index,
      content_block: content_block
    )
  else
    events_to_yield << event
  end

  events_to_yield
end

#iteratorEnumerable<generic<Elem>>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Consumes raw stream events and yields a mix of raw and higher-level typed events while maintaining accumulated message state. This is what's called when you run each on the stream.

Returns:

  • (Enumerable<generic<Elem>>)


24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 24

private def iterator
  @iterator ||= Anthropic::Internal::Util.chain_fused(@stream) do |y|
    @raw_stream.each do |raw_event|
      @accumated_message_snapshot = accumulate_event(
        event: raw_event,
        current_snapshot: @accumated_message_snapshot
      )
      events_to_yield = build_events(event: raw_event, message_snapshot: @accumated_message_snapshot)
      events_to_yield.each(&y)
    end
  end
end

#textEnumerable<String>

Returns an enumerable of text deltas from the streaming response.

Returns:

  • (Enumerable<String>)


49
50
51
52
53
54
55
56
57
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 49

def text
  Anthropic::Internal::Util.chain_fused(@iterator) do |y|
    @iterator.each do |event|
      if event.type == :content_block_delta && event.delta.type == :text_delta
        y << event.delta.text
      end
    end
  end
end

#until_donevoid

This method returns an undefined value.

Blocks until the stream has been consumed



42
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 42

def until_done = each {} # rubocop:disable Lint/EmptyBlock