Class: LlmGateway::Adapters::NormalizedStreamAccumulator

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_gateway/adapters/normalized_stream_accumulator.rb

Constant Summary collapse

DEFAULT_USAGE =
{
  input: 0,
  cache_write: 0,
  cache_read: 0,
  output: 0,
  total: 0,
  raw: {}
}.freeze
BLOCK_EVENT_TRANSITIONS =
{
  text_start: { block_type: :text, phase: :start },
  text_delta: { block_type: :text, phase: :delta },
  text_end: { block_type: :text, phase: :end },
  tool_start: { block_type: :tool, phase: :start },
  tool_delta: { block_type: :tool, phase: :delta },
  tool_end: { block_type: :tool, phase: :end },
  tool_result_start: { block_type: :tool_result, phase: :start },
  tool_result_delta: { block_type: :tool_result, phase: :delta },
  tool_result_end: { block_type: :tool_result, phase: :end },
  reasoning_start: { block_type: :reasoning, phase: :start },
  reasoning_delta: { block_type: :reasoning, phase: :delta },
  reasoning_end: { block_type: :reasoning, phase: :end }
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(provider: nil, api: nil) ⇒ NormalizedStreamAccumulator

Returns a new instance of NormalizedStreamAccumulator.



83
84
85
86
87
88
89
90
91
92
93
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 83

def initialize(provider: nil, api: nil)
  @provider = provider
  @api = api
  @message_hash = {}
  @usage_hash = default_usage
  @blocks = []
  @next_content_index = 0
  @active_block_type = nil
  @active_content_index = nil
  @timestamp = nil
end

Instance Attribute Details

#active_block_typeObject (readonly)

Returns the value of attribute active_block_type.



57
58
59
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 57

def active_block_type
  @active_block_type
end

#blocksObject

Contract:

‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.

Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.

Accepted event shapes:

{ type: :message_start, delta: { id: "...", model: "...", role: "assistant", timestamp: 1716650000000 } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage: { output: 2 } }
{ type: :message_end }

{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }

{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }

{ type: :tool_start, id: "...", name: "tool_name", tool_type: "tool_use", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }

{ type: :tool_result_start, tool_use_id: "...", name: "server_tool_result", delta: "..." }
{ type: :tool_result_delta, delta: "..." }
{ type: :tool_result_end, delta: "" }

Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.

Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.

The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.



56
57
58
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 56

def blocks
  @blocks
end

#final_messageObject (readonly)

Returns the value of attribute final_message.



57
58
59
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 57

def final_message
  @final_message
end

#message_hashObject

Contract:

‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.

Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.

Accepted event shapes:

{ type: :message_start, delta: { id: "...", model: "...", role: "assistant", timestamp: 1716650000000 } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage: { output: 2 } }
{ type: :message_end }

{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }

{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }

{ type: :tool_start, id: "...", name: "tool_name", tool_type: "tool_use", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }

{ type: :tool_result_start, tool_use_id: "...", name: "server_tool_result", delta: "..." }
{ type: :tool_result_delta, delta: "..." }
{ type: :tool_result_end, delta: "" }

Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.

Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.

The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.



56
57
58
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 56

def message_hash
  @message_hash
end

#usage_hashObject

Contract:

‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.

Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.

Accepted event shapes:

{ type: :message_start, delta: { id: "...", model: "...", role: "assistant", timestamp: 1716650000000 } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage: { output: 2 } }
{ type: :message_end }

{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }

{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }

{ type: :tool_start, id: "...", name: "tool_name", tool_type: "tool_use", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }

{ type: :tool_result_start, tool_use_id: "...", name: "server_tool_result", delta: "..." }
{ type: :tool_result_delta, delta: "..." }
{ type: :tool_result_end, delta: "" }

Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.

Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.

The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.



56
57
58
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 56

def usage_hash
  @usage_hash
end

Instance Method Details

#active_tool?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 109

def active_tool?
  active_block_type == :tool
end

#final_resultObject



105
106
107
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 105

def final_result
  result.merge(provider: @provider, api: @api)
end

#push(event_patch, &block) ⇒ Object

Raises:

  • (ArgumentError)


113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 113

def push(event_patch, &block)
  raise ArgumentError, "Normalized stream event patch must be a Hash" unless event_patch.is_a?(Hash)

  event_patch = event_patch.symbolize_keys
  type = event_patch.fetch(:type).to_sym
  event_patch = prepare_event_patch(event_patch.merge(type:), type)
  ensure_timestamp!

  if type == :message_end
    @final_message = AssistantMessage.new(final_result)
    block.call(AssistantStreamMessageEndEvent.new(type:, message: final_message)) if block
    return nil
  end

  event = build_event(event_patch, partial: empty_partial)
  accumulate(event)
  content_index = event.content_index if event.respond_to?(:content_index)
  commit_block_transition(type, content_index)
  event = build_event(event_patch, partial: partial_message)
  block.call(event) if block

  nil
end

#resultObject



95
96
97
98
99
100
101
102
103
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 95

def result
  ensure_timestamp!

  message_hash.merge(
    timestamp: @timestamp,
    usage: usage_hash,
    content: serialized_blocks
  )
end