Class: LlmGateway::Adapters::NormalizedStreamAccumulator

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_gateway/adapters/normalized_stream_accumulator.rb

Constant Summary collapse

BLOCK_EVENT_TRANSITIONS =
{
  text_start: { block_type: :text, phase: :start },
  text_delta: { block_type: :text, phase: :delta },
  text_end: { block_type: :text, phase: :end },
  tool_start: { block_type: :tool, phase: :start },
  tool_delta: { block_type: :tool, phase: :delta },
  tool_end: { block_type: :tool, phase: :end },
  reasoning_start: { block_type: :reasoning, phase: :start },
  reasoning_delta: { block_type: :reasoning, phase: :delta },
  reasoning_end: { block_type: :reasoning, phase: :end }
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeNormalizedStreamAccumulator

Returns a new instance of NormalizedStreamAccumulator.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 67

def initialize
  @message_hash = {}
  @usage_hash = {
    input_tokens: 0,
    cache_creation_input_tokens: 0,
    cache_read_input_tokens: 0,
    output_tokens: 0,
    reasoning_tokens: 0
  }
  @blocks = []
  @next_content_index = 0
  @active_block_type = nil
  @active_content_index = nil
end

Instance Attribute Details

#active_block_typeObject (readonly)

Returns the value of attribute active_block_type.



53
54
55
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 53

def active_block_type
  @active_block_type
end

#blocksObject

Contract:

‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.

Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.

Accepted event shapes:

{ type: :message_start, delta: { id: "...", model: "...", role: "assistant" }, usage_increment: { ... } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage_increment: { ... } }
{ type: :message_end }

{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }

{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }

{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }

Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.

Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.

The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.



52
53
54
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52

def blocks
  @blocks
end

#message_hashObject

Contract:

‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.

Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.

Accepted event shapes:

{ type: :message_start, delta: { id: "...", model: "...", role: "assistant" }, usage_increment: { ... } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage_increment: { ... } }
{ type: :message_end }

{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }

{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }

{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }

Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.

Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.

The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.



52
53
54
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52

def message_hash
  @message_hash
end

#usage_hashObject

Contract:

‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.

Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.

Accepted event shapes:

{ type: :message_start, delta: { id: "...", model: "...", role: "assistant" }, usage_increment: { ... } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage_increment: { ... } }
{ type: :message_end }

{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }

{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }

{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }

Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.

Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.

The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.



52
53
54
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52

def usage_hash
  @usage_hash
end

Instance Method Details

#active_tool?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 89

def active_tool?
  active_block_type == :tool
end

#push(event_patch, &block) ⇒ Object

Raises:

  • (ArgumentError)


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 93

def push(event_patch, &block)
  raise ArgumentError, "Normalized stream event patch must be a Hash" unless event_patch.is_a?(Hash)

  event_patch = symbolize_keys(event_patch)
  type = event_patch.fetch(:type).to_sym
  event_patch = prepare_event_patch(event_patch.merge(type:), type)

  event = build_event(event_patch)
  accumulate(event)
  content_index = event.content_index if event.respond_to?(:content_index)
  commit_block_transition(type, content_index)
  block.call(event) if block

  nil
end

#resultObject



82
83
84
85
86
87
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 82

def result
  message_hash.merge(
    usage: usage_hash,
    content: serialized_blocks
  )
end