Class: LlmGateway::Adapters::NormalizedStreamAccumulator

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_gateway/adapters/normalized_stream_accumulator.rb

Constant Summary collapse

DEFAULT_USAGE =
{
  input: 0,
  cache_write: 0,
  cache_read: 0,
  output: 0,
  total: 0,
  raw: {}
}.freeze
BLOCK_EVENT_TRANSITIONS =
{
  text_start: { block_type: :text, phase: :start },
  text_delta: { block_type: :text, phase: :delta },
  text_end: { block_type: :text, phase: :end },
  tool_start: { block_type: :tool, phase: :start },
  tool_delta: { block_type: :tool, phase: :delta },
  tool_end: { block_type: :tool, phase: :end },
  reasoning_start: { block_type: :reasoning, phase: :start },
  reasoning_delta: { block_type: :reasoning, phase: :delta },
  reasoning_end: { block_type: :reasoning, phase: :end }
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(provider: nil, api: nil) ⇒ NormalizedStreamAccumulator

Returns a new instance of NormalizedStreamAccumulator.



76
77
78
79
80
81
82
83
84
85
86
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 76

def initialize(provider: nil, api: nil)
  @provider = provider
  @api = api
  @message_hash = {}
  @usage_hash = default_usage
  @blocks = []
  @next_content_index = 0
  @active_block_type = nil
  @active_content_index = nil
  @timestamp = nil
end

Instance Attribute Details

#active_block_typeObject (readonly)

Returns the value of attribute active_block_type.



53
54
55
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 53

def active_block_type
  @active_block_type
end

#blocksObject

Contract:

‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.

Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.

Accepted event shapes:

{ type: :message_start, delta: { id: "...", model: "...", role: "assistant", timestamp: 1716650000000 } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage: { output: 2 } }
{ type: :message_end }

{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }

{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }

{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }

Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.

Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.

The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.



52
53
54
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52

def blocks
  @blocks
end

#final_messageObject (readonly)

Returns the value of attribute final_message.



53
54
55
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 53

def final_message
  @final_message
end

#message_hashObject

Contract:

‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.

Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.

Accepted event shapes:

{ type: :message_start, delta: { id: "...", model: "...", role: "assistant", timestamp: 1716650000000 } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage: { output: 2 } }
{ type: :message_end }

{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }

{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }

{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }

Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.

Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.

The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.



52
53
54
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52

def message_hash
  @message_hash
end

#usage_hashObject

Contract:

‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.

Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.

Accepted event shapes:

{ type: :message_start, delta: { id: "...", model: "...", role: "assistant", timestamp: 1716650000000 } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage: { output: 2 } }
{ type: :message_end }

{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }

{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }

{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }

Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.

Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.

The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.



52
53
54
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52

def usage_hash
  @usage_hash
end

Instance Method Details

#active_tool?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 102

def active_tool?
  active_block_type == :tool
end

#final_resultObject



98
99
100
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 98

def final_result
  result.merge(provider: @provider, api: @api)
end

#push(event_patch, &block) ⇒ Object

Raises:

  • (ArgumentError)


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 106

def push(event_patch, &block)
  raise ArgumentError, "Normalized stream event patch must be a Hash" unless event_patch.is_a?(Hash)

  event_patch = symbolize_keys(event_patch)
  type = event_patch.fetch(:type).to_sym
  event_patch = prepare_event_patch(event_patch.merge(type:), type)
  ensure_timestamp!

  if type == :message_end
    @final_message = AssistantMessage.new(final_result)
    block.call(AssistantStreamMessageEndEvent.new(type:, message: final_message)) if block
    return nil
  end

  event = build_event(event_patch, partial: empty_partial)
  accumulate(event)
  content_index = event.content_index if event.respond_to?(:content_index)
  commit_block_transition(type, content_index)
  event = build_event(event_patch, partial: partial_message)
  block.call(event) if block

  nil
end

#resultObject



88
89
90
91
92
93
94
95
96
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 88

def result
  ensure_timestamp!

  message_hash.merge(
    timestamp: @timestamp,
    usage: usage_hash,
    content: serialized_blocks
  )
end