ai_stream
Speak the Vercel AI SDK's streaming protocol from Ruby.
The Vercel AI SDK is the de-facto frontend toolkit for AI apps — its
useChat, useCompletion, and useObject hooks power a huge share of the AI UIs shipping
today. Those hooks consume a specific Data Stream Protocol
(a.k.a. the UI Message Stream Protocol): a Server-Sent-Events wire format that is
language-agnostic by design — Vercel documents Python/FastAPI backends speaking it.
Ruby had nothing. If you wanted the polished Vercel useChat frontend in front of a Rails
app, you had to hand-roll the SSE framing and the exact text-start / text-delta /
tool-input-available / finish part encoding by reading the TypeScript source.
ai_stream is a faithful, pure-Ruby, zero-dependency encoder for that protocol. It's
provider-agnostic: it sits downstream of whatever produced the tokens
(ruby_llm, ruby-openai,
a raw HTTP stream, or canned text), so it composes with the existing Ruby AI stack instead
of competing with it.
Installation
# Gemfile
gem "ai_stream"
bundle install
Or:
gem install ai_stream
Quick start
require "ai_stream"
body = AiStream::Stream.new do |w|
w.start # {"type":"start","messageId":"..."}
id = w.text_start # {"type":"text-start","id":"..."}
w.text_delta("Hello", id: id)
w.text_delta(" world", id: id)
w.text_end(id: id)
w.finish # {"type":"finish"}
end
puts body.to_s
# data: {"type":"start","messageId":"..."}
#
# data: {"type":"text-start","id":"..."}
#
# data: {"type":"text-delta","id":"...","delta":"Hello"}
#
# data: {"type":"text-delta","id":"...","delta":" world"}
#
# data: {"type":"text-end","id":"..."}
#
# data: {"type":"finish"}
#
# data: [DONE]
In Rails (streaming to useChat)
class ChatController < ApplicationController
include ActionController::Live
def create
# Required header so the AI SDK treats this as a UI message stream:
AiStream::HEADERS.each { |k, v| response.headers[k] = v }
response.headers["Content-Type"] = "text/event-stream"
AiStream::Stream.new do |w|
w.start
id = w.text_start
# Pipe tokens from any source. Example with ruby_llm:
RubyLLM.chat.ask(params[:prompt]) do |chunk|
w.text_delta(chunk.content, id: id)
end
w.text_end(id: id)
w.finish
end.each { |frame| response.stream.write(frame) }
ensure
response.stream.close
end
end
Frontend, unchanged from any Vercel AI SDK app:
const { messages, sendMessage } = useChat({ api: "/chat" });
In plain Rack
AiStream::Stream is a valid Rack response body (it responds to #each and yields complete
SSE frames):
run lambda { |env|
body = AiStream::Stream.new do |w|
w.start
w.text("Hi from Rack")
w.finish
end
headers = AiStream::HEADERS.merge("content-type" => "text/event-stream")
[200, headers, body]
}
Tool calls
The protocol streams tool calls as a lifecycle. For incrementally-produced arguments:
AiStream::Stream.new do |w|
w.start
w.start_step
w.tool_input_start(tool_call_id: "t1", tool_name: "get_weather")
w.tool_input_delta(tool_call_id: "t1", delta: '{"city":')
w.tool_input_delta(tool_call_id: "t1", delta: '"SF"}')
w.tool_input_available(tool_call_id: "t1", tool_name: "get_weather", input: { city: "SF" })
w.tool_output_available(tool_call_id: "t1", output: { temp: 64 })
w.finish_step
w.start_step
w.text("It's 64°F in San Francisco.")
w.finish_step
w.finish
end
When the input is already known, #tool_call collapses the two parts (and shares a
toolCallId):
w.tool_call(tool_name: "search", input: { q: "ruby" }, output: { hits: 3 })
Supported parts
Every part type from the AI SDK UI Stream Protocol:
| Category | Writer methods |
|---|---|
| Lifecycle | start, start_step, finish_step, finish, abort, error |
| Text | text_start, text_delta, text_end, text |
| Reasoning | reasoning_start, reasoning_delta, reasoning_end |
| Tools | tool_input_start, tool_input_delta, tool_input_available, tool_output_available, tool_call |
| Sources / files | source_url, source_document, file |
| Custom data | data(name, payload) → data-<name> |
| Forward-compat | emit(type:, ...) for any part type added after this release |
The stream is terminated with the SSE data: [DONE] sentinel automatically by
AiStream::Stream; if you drive a Writer directly, call #done yourself.
Why a Writer and a Stream?
AiStream::Writeris the low-level encoder. Give it any sink that responds to<<(aString, anIO, a Rack stream). It does no IO of its own, which makes it trivial to unit-test — feed it aStringand assert on the bytes. (That's exactly how this gem's own tests work, with no API key required.)AiStream::Streamwraps aWriterin a lazy, re-enumerable, Rack-compatible body and handles the[DONE]terminator for you.
Development
bin/setup # or: bundle install
bundle exec rake test