Class: LlmGateway::Adapters::NormalizedStreamAccumulator
- Inherits:
-
Object
- Object
- LlmGateway::Adapters::NormalizedStreamAccumulator
- Defined in:
- lib/llm_gateway/adapters/normalized_stream_accumulator.rb
Constant Summary collapse
- BLOCK_EVENT_TRANSITIONS =
{ text_start: { block_type: :text, phase: :start }, text_delta: { block_type: :text, phase: :delta }, text_end: { block_type: :text, phase: :end }, tool_start: { block_type: :tool, phase: :start }, tool_delta: { block_type: :tool, phase: :delta }, tool_end: { block_type: :tool, phase: :end }, reasoning_start: { block_type: :reasoning, phase: :start }, reasoning_delta: { block_type: :reasoning, phase: :delta }, reasoning_end: { block_type: :reasoning, phase: :end } }.freeze
Instance Attribute Summary collapse
-
#active_block_type ⇒ Object
readonly
Returns the value of attribute active_block_type.
-
#blocks ⇒ Object
Contract:.
-
#message_hash ⇒ Object
Contract:.
-
#usage_hash ⇒ Object
Contract:.
Instance Method Summary collapse
- #active_tool? ⇒ Boolean
-
#initialize ⇒ NormalizedStreamAccumulator
constructor
A new instance of NormalizedStreamAccumulator.
- #push(event_patch, &block) ⇒ Object
- #result ⇒ Object
Constructor Details
#initialize ⇒ NormalizedStreamAccumulator
Returns a new instance of NormalizedStreamAccumulator.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 67 def initialize @message_hash = {} @usage_hash = { input_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0, output_tokens: 0, reasoning_tokens: 0 } @blocks = [] @next_content_index = 0 @active_block_type = nil @active_content_index = nil end |
Instance Attribute Details
#active_block_type ⇒ Object (readonly)
Returns the value of attribute active_block_type.
53 54 55 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 53 def active_block_type @active_block_type end |
#blocks ⇒ Object
Contract:
‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.
Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.
Accepted event shapes:
{ type: :message_start, delta: { id: "...", model: "...", role: "assistant" }, usage_increment: { ... } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage_increment: { ... } }
{ type: :message_end }
{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }
{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }
{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }
Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.
Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.
The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.
52 53 54 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52 def blocks @blocks end |
#message_hash ⇒ Object
Contract:
‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.
Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.
Accepted event shapes:
{ type: :message_start, delta: { id: "...", model: "...", role: "assistant" }, usage_increment: { ... } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage_increment: { ... } }
{ type: :message_end }
{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }
{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }
{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }
Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.
Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.
The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.
52 53 54 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52 def @message_hash end |
#usage_hash ⇒ Object
Contract:
‘push` accepts a single provider-independent, normalized stream event patch hash. Event patches are never arrays; mappers call `push` once per patch.
Provider wire events such as Anthropic ‘message_start` / `content_block_start`, OpenAI `response.output_text.delta`, etc. must be translated by the mapper before calling this accumulator. The normalized symbol `:message_start` below is allowed; the raw provider event string is not.
Accepted event shapes:
{ type: :message_start, delta: { id: "...", model: "...", role: "assistant" }, usage_increment: { ... } }
{ type: :message_delta, delta: { stop_reason: "stop" }, usage_increment: { ... } }
{ type: :message_end }
{ type: :text_start, delta: "hi" }
{ type: :text_delta, delta: " there" }
{ type: :text_end, delta: "" }
{ type: :reasoning_start, delta: "thinking", signature: "" }
{ type: :reasoning_delta, delta: "...", signature: "" }
{ type: :reasoning_end, delta: "", signature: "" }
{ type: :tool_start, id: "...", name: "tool_name", delta: "" }
{ type: :tool_delta, delta: "{\"a\":" }
{ type: :tool_end, delta: "" }
Mappers do not provide ‘content_index`. The accumulator assigns the next public content index when a block starts and reuses the active content index for that block’s deltas and end event.
Without source indexes, the accumulator cannot detect two interleaved blocks of the same type. Providers that can interleave same-type blocks must buffer or serialize them in the mapper before pushing normalized events.
The accumulator creates the public Assistant* event structs, updates its accumulated message state, then yields the created event to the callback.
52 53 54 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 52 def usage_hash @usage_hash end |
Instance Method Details
#active_tool? ⇒ Boolean
89 90 91 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 89 def active_tool? active_block_type == :tool end |
#push(event_patch, &block) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 93 def push(event_patch, &block) raise ArgumentError, "Normalized stream event patch must be a Hash" unless event_patch.is_a?(Hash) event_patch = symbolize_keys(event_patch) type = event_patch.fetch(:type).to_sym event_patch = prepare_event_patch(event_patch.merge(type:), type) event = build_event(event_patch) accumulate(event) content_index = event.content_index if event.respond_to?(:content_index) commit_block_transition(type, content_index) block.call(event) if block nil end |
#result ⇒ Object
82 83 84 85 86 87 |
# File 'lib/llm_gateway/adapters/normalized_stream_accumulator.rb', line 82 def result .merge( usage: usage_hash, content: serialized_blocks ) end |