Class: AiStream::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/ai_stream/writer.rb

Overview

Writer encodes the Vercel AI SDK “Data Stream Protocol” (a.k.a. UI Message Stream Protocol) onto an arbitrary sink.

The protocol is a sequence of Server-Sent Events. Every event is a single JSON object framed as:

data: {"type":"text-delta","id":"...","delta":"Hi"}\n\n

and the stream is terminated with the sentinel:

data: [DONE]\n\n

A consuming frontend (Vercel AI SDK’s useChat / useCompletion / useObject) expects the HTTP response header ‘x-vercel-ai-ui-message-stream: v1` (see AiStream::HEADERS).

The sink is anything that responds to ‘<<` (a String, an IO, a Rack stream, an Array buffer, …). The Writer never performs IO itself beyond `sink <<`, which keeps it trivially unit-testable: feed it a String and assert on bytes.

Example:

buf = +""
w = AiStream::Writer.new(buf)
w.start
id = w.text_start
w.text_delta("Hello", id: id)
w.text_delta(" world", id: id)
w.text_end(id: id)
w.finish
w.done

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sink) ⇒ Writer

Returns a new instance of Writer.

Parameters:

  • sink (#<<)

    anything that accepts string chunks (IO, String, Rack body, …)



50
51
52
53
# File 'lib/ai_stream/writer.rb', line 50

def initialize(sink)
  @sink = sink
  @closed = false
end

Instance Attribute Details

#sinkObject (readonly)

Returns the value of attribute sink.



47
48
49
# File 'lib/ai_stream/writer.rb', line 47

def sink
  @sink
end

Class Method Details

.generate_idObject

Returns a freshly generated id when callers don’t supply one. Exposed so text/reasoning/tool parts can share an id across their start/delta/end lifecycle.



43
44
45
# File 'lib/ai_stream/writer.rb', line 43

def self.generate_id
  SecureRandom.uuid
end

Instance Method Details

#abort(reason: nil) ⇒ Object

Cooperative cancellation frame.



83
84
85
86
87
# File 'lib/ai_stream/writer.rb', line 83

def abort(reason: nil)
  part = { type: "abort" }
  part[:reason] = reason unless reason.nil?
  emit(part)
end

#closed?Boolean

Returns:

  • (Boolean)


220
221
222
# File 'lib/ai_stream/writer.rb', line 220

def closed?
  @closed
end

#data(name, payload) ⇒ Object

Emits a ‘data-<name>` part. The frontend matches on the full type string, so a name of “weather” produces “type”:“data-weather”,“data”:{…}.



161
162
163
# File 'lib/ai_stream/writer.rb', line 161

def data(name, payload)
  emit(type: "data-#{name}", data: payload)
end

#doneObject

Write the SSE terminator. After this, further emits raise ClosedError.



212
213
214
215
216
217
218
# File 'lib/ai_stream/writer.rb', line 212

def done
  return self if @closed

  write_frame("[DONE]")
  @closed = true
  self
end

#emit(part) ⇒ Object

Emit a raw, pre-shaped part hash. Validates that :type is present, JSON encodes it, and writes one SSE event. Useful for protocol part types added after this gem’s release.

Raises:



201
202
203
204
205
206
207
208
209
# File 'lib/ai_stream/writer.rb', line 201

def emit(part)
  raise ClosedError, "stream already terminated with [DONE]" if @closed

  hash = part.is_a?(Hash) ? part : part.to_h
  raise ArgumentError, "part must include a :type" unless hash[:type] || hash["type"]

  write_frame(JSON.generate(hash))
  self
end

#error(text) ⇒ Object

Surface an error to the client. The frontend renders ‘errorText`.



90
91
92
# File 'lib/ai_stream/writer.rb', line 90

def error(text)
  emit(type: "error", errorText: text.to_s)
end

#file(url:, media_type:) ⇒ Object



153
154
155
# File 'lib/ai_stream/writer.rb', line 153

def file(url:, media_type:)
  emit(type: "file", url: url, mediaType: media_type)
end

#finishObject

Terminal message frame. Does NOT write the SSE [DONE] sentinel; call #done for that (kept separate so callers can finish a message but keep the HTTP connection open, which some multi-message flows want).



78
79
80
# File 'lib/ai_stream/writer.rb', line 78

def finish
  emit(type: "finish")
end

#finish_stepObject



71
72
73
# File 'lib/ai_stream/writer.rb', line 71

def finish_step
  emit(type: "finish-step")
end

#reasoning_delta(delta, id:) ⇒ Object



128
129
130
# File 'lib/ai_stream/writer.rb', line 128

def reasoning_delta(delta, id:)
  emit(type: "reasoning-delta", id: id, delta: delta.to_s)
end

#reasoning_end(id:) ⇒ Object



132
133
134
# File 'lib/ai_stream/writer.rb', line 132

def reasoning_end(id:)
  emit(type: "reasoning-end", id: id)
end

#reasoning_start(id: nil) ⇒ Object

— Reasoning ———————————————————–



122
123
124
125
126
# File 'lib/ai_stream/writer.rb', line 122

def reasoning_start(id: nil)
  rid = id || self.class.generate_id
  emit(type: "reasoning-start", id: rid)
  rid
end

#source_document(media_type:, title:, source_id: nil) ⇒ Object



144
145
146
147
148
149
150
151
# File 'lib/ai_stream/writer.rb', line 144

def source_document(media_type:, title:, source_id: nil)
  emit(
    type: "source-document",
    sourceId: source_id || self.class.generate_id,
    mediaType: media_type,
    title: title
  )
end

#source_url(url, source_id: nil, title: nil) ⇒ Object

— Sources & files —————————————————–



138
139
140
141
142
# File 'lib/ai_stream/writer.rb', line 138

def source_url(url, source_id: nil, title: nil)
  part = { type: "source-url", sourceId: source_id || self.class.generate_id, url: url }
  part[:title] = title unless title.nil?
  emit(part)
end

#start(message_id: nil) ⇒ Object

Emit the message-start frame. messageId is optional; one is generated when omitted so the frontend always has a stable id to key the message on.



59
60
61
62
63
# File 'lib/ai_stream/writer.rb', line 59

def start(message_id: nil)
  mid = message_id || self.class.generate_id
  emit(type: "start", messageId: mid)
  mid
end

#start_stepObject

Multi-step runs (tool call -> tool result -> more text) are bracketed by start-step / finish-step pairs.



67
68
69
# File 'lib/ai_stream/writer.rb', line 67

def start_step
  emit(type: "start-step")
end

#text(content, id: nil) ⇒ Object

Convenience: emit a whole text block (start + single delta + end) at once. Returns the block id.



113
114
115
116
117
118
# File 'lib/ai_stream/writer.rb', line 113

def text(content, id: nil)
  tid = text_start(id: id)
  text_delta(content, id: tid)
  text_end(id: tid)
  tid
end

#text_delta(delta, id:) ⇒ Object



103
104
105
# File 'lib/ai_stream/writer.rb', line 103

def text_delta(delta, id:)
  emit(type: "text-delta", id: id, delta: delta.to_s)
end

#text_end(id:) ⇒ Object



107
108
109
# File 'lib/ai_stream/writer.rb', line 107

def text_end(id:)
  emit(type: "text-end", id: id)
end

#text_start(id: nil) ⇒ Object

Begin a text block. Returns the block id to thread through deltas/end.



97
98
99
100
101
# File 'lib/ai_stream/writer.rb', line 97

def text_start(id: nil)
  tid = id || self.class.generate_id
  emit(type: "text-start", id: tid)
  tid
end

#tool_call(tool_name:, input:, output:, tool_call_id: nil) ⇒ Object

Convenience: emit a complete non-streamed tool call (input known up front plus its output) inside its own step.



189
190
191
192
193
194
# File 'lib/ai_stream/writer.rb', line 189

def tool_call(tool_name:, input:, output:, tool_call_id: nil)
  id = tool_call_id || self.class.generate_id
  tool_input_available(tool_call_id: id, tool_name: tool_name, input: input)
  tool_output_available(tool_call_id: id, output: output)
  id
end

#tool_input_available(tool_call_id:, tool_name:, input:) ⇒ Object

Final, parsed tool input. ‘input` is any JSON-serializable object.



178
179
180
# File 'lib/ai_stream/writer.rb', line 178

def tool_input_available(tool_call_id:, tool_name:, input:)
  emit(type: "tool-input-available", toolCallId: tool_call_id, toolName: tool_name, input: input)
end

#tool_input_delta(tool_call_id:, delta:) ⇒ Object



173
174
175
# File 'lib/ai_stream/writer.rb', line 173

def tool_input_delta(tool_call_id:, delta:)
  emit(type: "tool-input-delta", toolCallId: tool_call_id, inputTextDelta: delta.to_s)
end

#tool_input_start(tool_call_id:, tool_name:) ⇒ Object

Streaming tool-input lifecycle (when arguments are produced incrementally):

tool_input_start -> tool_input_delta* -> tool_input_available


169
170
171
# File 'lib/ai_stream/writer.rb', line 169

def tool_input_start(tool_call_id:, tool_name:)
  emit(type: "tool-input-start", toolCallId: tool_call_id, toolName: tool_name)
end

#tool_output_available(tool_call_id:, output:) ⇒ Object

The result of executing the tool. ‘output` is any JSON-serializable object.



183
184
185
# File 'lib/ai_stream/writer.rb', line 183

def tool_output_available(tool_call_id:, output:)
  emit(type: "tool-output-available", toolCallId: tool_call_id, output: output)
end