Class: AiStream::Writer
- Inherits:
-
Object
- Object
- AiStream::Writer
- Defined in:
- lib/ai_stream/writer.rb
Overview
Writer encodes the Vercel AI SDK “Data Stream Protocol” (a.k.a. UI Message Stream Protocol) onto an arbitrary sink.
The protocol is a sequence of Server-Sent Events. Every event is a single JSON object framed as:
data: {"type":"text-delta","id":"...","delta":"Hi"}\n\n
and the stream is terminated with the sentinel:
data: [DONE]\n\n
A consuming frontend (Vercel AI SDK’s useChat / useCompletion / useObject) expects the HTTP response header ‘x-vercel-ai-ui-message-stream: v1` (see AiStream::HEADERS).
The sink is anything that responds to ‘<<` (a String, an IO, a Rack stream, an Array buffer, …). The Writer never performs IO itself beyond `sink <<`, which keeps it trivially unit-testable: feed it a String and assert on bytes.
Example:
buf = +""
w = AiStream::Writer.new(buf)
w.start
id = w.text_start
w.text_delta("Hello", id: id)
w.text_delta(" world", id: id)
w.text_end(id: id)
w.finish
w.done
Instance Attribute Summary collapse
-
#sink ⇒ Object
readonly
Returns the value of attribute sink.
Class Method Summary collapse
-
.generate_id ⇒ Object
Returns a freshly generated id when callers don’t supply one.
Instance Method Summary collapse
-
#abort(reason: nil) ⇒ Object
Cooperative cancellation frame.
- #closed? ⇒ Boolean
-
#data(name, payload) ⇒ Object
Emits a ‘data-<name>` part.
-
#done ⇒ Object
Write the SSE terminator.
-
#emit(part) ⇒ Object
Emit a raw, pre-shaped part hash.
-
#error(text) ⇒ Object
Surface an error to the client.
- #file(url:, media_type:) ⇒ Object
-
#finish ⇒ Object
Terminal message frame.
- #finish_step ⇒ Object
-
#initialize(sink) ⇒ Writer
constructor
A new instance of Writer.
- #reasoning_delta(delta, id:) ⇒ Object
- #reasoning_end(id:) ⇒ Object
-
#reasoning_start(id: nil) ⇒ Object
— Reasoning ———————————————————–.
- #source_document(media_type:, title:, source_id: nil) ⇒ Object
-
#source_url(url, source_id: nil, title: nil) ⇒ Object
— Sources & files —————————————————–.
-
#start(message_id: nil) ⇒ Object
Emit the message-start frame.
-
#start_step ⇒ Object
Multi-step runs (tool call -> tool result -> more text) are bracketed by start-step / finish-step pairs.
-
#text(content, id: nil) ⇒ Object
Convenience: emit a whole text block (start + single delta + end) at once.
- #text_delta(delta, id:) ⇒ Object
- #text_end(id:) ⇒ Object
-
#text_start(id: nil) ⇒ Object
Begin a text block.
-
#tool_call(tool_name:, input:, output:, tool_call_id: nil) ⇒ Object
Convenience: emit a complete non-streamed tool call (input known up front plus its output) inside its own step.
-
#tool_input_available(tool_call_id:, tool_name:, input:) ⇒ Object
Final, parsed tool input.
- #tool_input_delta(tool_call_id:, delta:) ⇒ Object
-
#tool_input_start(tool_call_id:, tool_name:) ⇒ Object
Streaming tool-input lifecycle (when arguments are produced incrementally): tool_input_start -> tool_input_delta* -> tool_input_available.
-
#tool_output_available(tool_call_id:, output:) ⇒ Object
The result of executing the tool.
Constructor Details
#initialize(sink) ⇒ Writer
Returns a new instance of Writer.
50 51 52 53 |
# File 'lib/ai_stream/writer.rb', line 50 def initialize(sink) @sink = sink @closed = false end |
Instance Attribute Details
#sink ⇒ Object (readonly)
Returns the value of attribute sink.
47 48 49 |
# File 'lib/ai_stream/writer.rb', line 47 def sink @sink end |
Class Method Details
.generate_id ⇒ Object
Returns a freshly generated id when callers don’t supply one. Exposed so text/reasoning/tool parts can share an id across their start/delta/end lifecycle.
43 44 45 |
# File 'lib/ai_stream/writer.rb', line 43 def self.generate_id SecureRandom.uuid end |
Instance Method Details
#abort(reason: nil) ⇒ Object
Cooperative cancellation frame.
83 84 85 86 87 |
# File 'lib/ai_stream/writer.rb', line 83 def abort(reason: nil) part = { type: "abort" } part[:reason] = reason unless reason.nil? emit(part) end |
#closed? ⇒ Boolean
220 221 222 |
# File 'lib/ai_stream/writer.rb', line 220 def closed? @closed end |
#data(name, payload) ⇒ Object
Emits a ‘data-<name>` part. The frontend matches on the full type string, so a name of “weather” produces “type”:“data-weather”,“data”:{…}.
161 162 163 |
# File 'lib/ai_stream/writer.rb', line 161 def data(name, payload) emit(type: "data-#{name}", data: payload) end |
#done ⇒ Object
Write the SSE terminator. After this, further emits raise ClosedError.
212 213 214 215 216 217 218 |
# File 'lib/ai_stream/writer.rb', line 212 def done return self if @closed write_frame("[DONE]") @closed = true self end |
#emit(part) ⇒ Object
Emit a raw, pre-shaped part hash. Validates that :type is present, JSON encodes it, and writes one SSE event. Useful for protocol part types added after this gem’s release.
201 202 203 204 205 206 207 208 209 |
# File 'lib/ai_stream/writer.rb', line 201 def emit(part) raise ClosedError, "stream already terminated with [DONE]" if @closed hash = part.is_a?(Hash) ? part : part.to_h raise ArgumentError, "part must include a :type" unless hash[:type] || hash["type"] write_frame(JSON.generate(hash)) self end |
#error(text) ⇒ Object
Surface an error to the client. The frontend renders ‘errorText`.
90 91 92 |
# File 'lib/ai_stream/writer.rb', line 90 def error(text) emit(type: "error", errorText: text.to_s) end |
#file(url:, media_type:) ⇒ Object
153 154 155 |
# File 'lib/ai_stream/writer.rb', line 153 def file(url:, media_type:) emit(type: "file", url: url, mediaType: media_type) end |
#finish ⇒ Object
Terminal message frame. Does NOT write the SSE [DONE] sentinel; call #done for that (kept separate so callers can finish a message but keep the HTTP connection open, which some multi-message flows want).
78 79 80 |
# File 'lib/ai_stream/writer.rb', line 78 def finish emit(type: "finish") end |
#finish_step ⇒ Object
71 72 73 |
# File 'lib/ai_stream/writer.rb', line 71 def finish_step emit(type: "finish-step") end |
#reasoning_delta(delta, id:) ⇒ Object
128 129 130 |
# File 'lib/ai_stream/writer.rb', line 128 def reasoning_delta(delta, id:) emit(type: "reasoning-delta", id: id, delta: delta.to_s) end |
#reasoning_end(id:) ⇒ Object
132 133 134 |
# File 'lib/ai_stream/writer.rb', line 132 def reasoning_end(id:) emit(type: "reasoning-end", id: id) end |
#reasoning_start(id: nil) ⇒ Object
— Reasoning ———————————————————–
122 123 124 125 126 |
# File 'lib/ai_stream/writer.rb', line 122 def reasoning_start(id: nil) rid = id || self.class.generate_id emit(type: "reasoning-start", id: rid) rid end |
#source_document(media_type:, title:, source_id: nil) ⇒ Object
144 145 146 147 148 149 150 151 |
# File 'lib/ai_stream/writer.rb', line 144 def source_document(media_type:, title:, source_id: nil) emit( type: "source-document", sourceId: source_id || self.class.generate_id, mediaType: media_type, title: title ) end |
#source_url(url, source_id: nil, title: nil) ⇒ Object
— Sources & files —————————————————–
138 139 140 141 142 |
# File 'lib/ai_stream/writer.rb', line 138 def source_url(url, source_id: nil, title: nil) part = { type: "source-url", sourceId: source_id || self.class.generate_id, url: url } part[:title] = title unless title.nil? emit(part) end |
#start(message_id: nil) ⇒ Object
Emit the message-start frame. messageId is optional; one is generated when omitted so the frontend always has a stable id to key the message on.
59 60 61 62 63 |
# File 'lib/ai_stream/writer.rb', line 59 def start(message_id: nil) mid = || self.class.generate_id emit(type: "start", messageId: mid) mid end |
#start_step ⇒ Object
Multi-step runs (tool call -> tool result -> more text) are bracketed by start-step / finish-step pairs.
67 68 69 |
# File 'lib/ai_stream/writer.rb', line 67 def start_step emit(type: "start-step") end |
#text(content, id: nil) ⇒ Object
Convenience: emit a whole text block (start + single delta + end) at once. Returns the block id.
113 114 115 116 117 118 |
# File 'lib/ai_stream/writer.rb', line 113 def text(content, id: nil) tid = text_start(id: id) text_delta(content, id: tid) text_end(id: tid) tid end |
#text_delta(delta, id:) ⇒ Object
103 104 105 |
# File 'lib/ai_stream/writer.rb', line 103 def text_delta(delta, id:) emit(type: "text-delta", id: id, delta: delta.to_s) end |
#text_end(id:) ⇒ Object
107 108 109 |
# File 'lib/ai_stream/writer.rb', line 107 def text_end(id:) emit(type: "text-end", id: id) end |
#text_start(id: nil) ⇒ Object
Begin a text block. Returns the block id to thread through deltas/end.
97 98 99 100 101 |
# File 'lib/ai_stream/writer.rb', line 97 def text_start(id: nil) tid = id || self.class.generate_id emit(type: "text-start", id: tid) tid end |
#tool_call(tool_name:, input:, output:, tool_call_id: nil) ⇒ Object
Convenience: emit a complete non-streamed tool call (input known up front plus its output) inside its own step.
189 190 191 192 193 194 |
# File 'lib/ai_stream/writer.rb', line 189 def tool_call(tool_name:, input:, output:, tool_call_id: nil) id = tool_call_id || self.class.generate_id tool_input_available(tool_call_id: id, tool_name: tool_name, input: input) tool_output_available(tool_call_id: id, output: output) id end |
#tool_input_available(tool_call_id:, tool_name:, input:) ⇒ Object
Final, parsed tool input. ‘input` is any JSON-serializable object.
178 179 180 |
# File 'lib/ai_stream/writer.rb', line 178 def tool_input_available(tool_call_id:, tool_name:, input:) emit(type: "tool-input-available", toolCallId: tool_call_id, toolName: tool_name, input: input) end |
#tool_input_delta(tool_call_id:, delta:) ⇒ Object
173 174 175 |
# File 'lib/ai_stream/writer.rb', line 173 def tool_input_delta(tool_call_id:, delta:) emit(type: "tool-input-delta", toolCallId: tool_call_id, inputTextDelta: delta.to_s) end |
#tool_input_start(tool_call_id:, tool_name:) ⇒ Object
Streaming tool-input lifecycle (when arguments are produced incrementally):
tool_input_start -> tool_input_delta* -> tool_input_available
169 170 171 |
# File 'lib/ai_stream/writer.rb', line 169 def tool_input_start(tool_call_id:, tool_name:) emit(type: "tool-input-start", toolCallId: tool_call_id, toolName: tool_name) end |
#tool_output_available(tool_call_id:, output:) ⇒ Object
The result of executing the tool. ‘output` is any JSON-serializable object.
183 184 185 |
# File 'lib/ai_stream/writer.rb', line 183 def tool_output_available(tool_call_id:, output:) emit(type: "tool-output-available", toolCallId: tool_call_id, output: output) end |