Class: AiStream::Stream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/ai_stream/stream.rb

Overview

Stream is a lazy, Rack-compatible response body that produces UI Message Stream Protocol frames on demand.

You pass a block that receives a Writer; the block runs the first time the body is enumerated (i.e. when Rack/Rails pulls bytes to send to the client), so nothing is buffered eagerly and the very first token can flush immediately. The terminating ‘data: [DONE]` frame is appended automatically unless the block already wrote it.

Rack:

body = AiStream::Stream.new do |w|
  w.start
  w.text("Hello")
  w.finish
end
[200, AiStream::HEADERS.merge("content-type" => "text/event-stream"), body]

Rails (controller):

include ActionController::Live
def chat
  AiStream::HEADERS.each { |k, v| response.headers[k] = v }
  response.headers["Content-Type"] = "text/event-stream"
  AiStream::Stream.new { |w| w.start; w.text("hi"); w.finish }.each { |chunk| response.stream.write(chunk) }
ensure
  response.stream.close
end

Or, even simpler, collect to a String for tests / non-streaming responses:

AiStream::Stream.new { |w| w.start; w.text("hi"); w.finish }.to_s

Defined Under Namespace

Classes: FrameSink

Instance Method Summary collapse

Constructor Details

#initialize {|writer| ... } ⇒ Stream

Returns a new instance of Stream.

Yield Parameters:

Raises:

  • (ArgumentError)


41
42
43
44
45
# File 'lib/ai_stream/stream.rb', line 41

def initialize(&block)
  raise ArgumentError, "AiStream::Stream requires a block" unless block

  @block = block
end

Instance Method Details

#eachObject

Rack body contract. Yields each SSE frame string. Re-enumerable: the block is run fresh on every #each so the same Stream can be rendered twice (handy in tests).



50
51
52
53
54
55
56
57
58
# File 'lib/ai_stream/stream.rb', line 50

def each
  return enum_for(:each) unless block_given?

  sink = FrameSink.new { |frame| yield frame }
  writer = Writer.new(sink)
  @block.call(writer)
  writer.done unless writer.closed?
  self
end

#to_sObject

Materialize the whole stream into one String.



61
62
63
64
65
# File 'lib/ai_stream/stream.rb', line 61

def to_s
  buf = String.new
  each { |frame| buf << frame }
  buf
end