Class: LlmGateway::Adapters::NormalizedStreamAccumulator
- Inherits:
-
Object
- Object
- LlmGateway::Adapters::NormalizedStreamAccumulator
- Defined in:
- lib/llm_gateway/adapters/normalized_stream_accumulator.rb
Constant Summary collapse
- DEFAULT_USAGE =
{ input: 0, cache_write: 0, cache_read: 0, output: 0, total: 0, raw: {} }.freeze
- BLOCK_EVENT_TRANSITIONS =
{ text_start: { block_type: :text, phase: :start }, text_delta: { block_type: :text, phase: :delta }, text_end: { block_type: :text, phase: :end }, tool_start: { block_type: :tool, phase: :start }, tool_delta: { block_type: :tool, phase: :delta }, tool_end: { block_type: :tool, phase: :end }, reasoning_start: { block_type: :reasoning, phase: :start }, reasoning_delta: { block_type: :reasoning, phase: :delta }, reasoning_end: { block_type: :reasoning, phase: :end } }.freeze
Instance Attribute Summary collapse
-
#active_block_type ⇒ Object
readonly
Returns the value of attribute active_block_type.
-
#blocks ⇒ Object
Contract:.
-
#final_message ⇒ Object
readonly
Returns the value of attribute final_message.
-
#message_hash ⇒ Object
Contract:.
-
#usage_hash ⇒ Object
Contract:.
Instance Method Summary collapse
- #active_tool? ⇒ Boolean
- #final_result ⇒ Object
-
#initialize(provider: nil, api: nil) ⇒ NormalizedStreamAccumulator
constructor
A new instance of NormalizedStreamAccumulator.
- #push(event_patch, &block) ⇒ Object
- #result ⇒ Object
Constructor Details
#initialize(provider: nil, api: nil) ⇒ NormalizedStreamAccumulator
Returns a new instance of NormalizedStreamAccumulator.
76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 76 def initialize(provider: nil, api: nil) @provider = provider @api = api @message_hash = {} @usage_hash = default_usage @blocks = [] @next_content_index = 0 @active_block_type = nil @active_content_index = nil @timestamp = nil end |
Instance Attribute Details
#active_block_type ⇒ Object (readonly)
Returns the value of attribute active_block_type.
53 54 55 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 53 def active_block_type @active_block_type end |
#blocks ⇒ Object
Contract:
‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.
Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.
Accepted event shapes:
{ type: :message_start, delta: { id: "...", model: "...", role: "assistant", timestamp: 1716650000000 } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage: { output: 2 } }
{ type: :message_end }
{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }
{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }
{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }
Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.
Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.
The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.
52 53 54 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52 def blocks @blocks end |
#final_message ⇒ Object (readonly)
Returns the value of attribute final_message.
53 54 55 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 53 def @final_message end |
#message_hash ⇒ Object
Contract:
‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.
Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.
Accepted event shapes:
{ type: :message_start, delta: { id: "...", model: "...", role: "assistant", timestamp: 1716650000000 } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage: { output: 2 } }
{ type: :message_end }
{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }
{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }
{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }
Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.
Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.
The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.
52 53 54 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52 def @message_hash end |
#usage_hash ⇒ Object
Contract:
‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.
Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.
Accepted event shapes:
{ type: :message_start, delta: { id: "...", model: "...", role: "assistant", timestamp: 1716650000000 } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage: { output: 2 } }
{ type: :message_end }
{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }
{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }
{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }
Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.
Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.
The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.
52 53 54 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52 def usage_hash @usage_hash end |
Instance Method Details
#active_tool? ⇒ Boolean
102 103 104 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 102 def active_tool? active_block_type == :tool end |
#final_result ⇒ Object
98 99 100 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 98 def final_result result.merge(provider: @provider, api: @api) end |
#push(event_patch, &block) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 106 def push(event_patch, &block) raise ArgumentError, "Normalized stream event patch must be a Hash" unless event_patch.is_a?(Hash) event_patch = symbolize_keys(event_patch) type = event_patch.fetch(:type).to_sym event_patch = prepare_event_patch(event_patch.merge(type:), type) if type == :message_end @final_message = AssistantMessage.new(final_result) block.call(AssistantStreamMessageEndEvent.new(type:, message: )) if block return nil end event = build_event(event_patch, partial: empty_partial) accumulate(event) content_index = event.content_index if event.respond_to?(:content_index) commit_block_transition(type, content_index) event = build_event(event_patch, partial: ) block.call(event) if block nil end |
#result ⇒ Object
88 89 90 91 92 93 94 95 96 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 88 def result .merge( timestamp: @timestamp, usage: usage_hash, content: serialized_blocks ) end |