Class: A2A::SSE::Stream
- Inherits:
-
Protocol::HTTP::Body::Writable
- Object
- Protocol::HTTP::Body::Writable
- A2A::SSE::Stream
- Defined in:
- lib/a2a/sse/stream.rb
Overview
Async-native SSE body built on Protocol::HTTP::Body::Writable.
Falcon’s protocol-rack passes Protocol::HTTP::Body::Readable subclasses through untouched — no Enumerable wrapping, no intermediate buffering. This gives us true async streaming with backpressure for free.
The gospel (protocol-http) teaches:
- Writable is a producer/consumer queue body
- write() pushes chunks; read() pops them (blocks when empty)
- close_write signals EOF (reader gets nil)
- Client disconnect raises on next write()
Usage:
stream = A2A::SSE::Stream.new
Async do
stream.event({ "task" => { ... } })
stream.event({ "statusUpdate" => { ... } })
stream.finish
end
# Return as Rack body — Falcon streams it natively
[200, { "content-type" => "text/event-stream" }, stream]
Direct Known Subclasses
Constant Summary collapse
- SSE_HEADERS =
{ "content-type" => "text/event-stream", "cache-control" => "no-cache, no-transform", "x-accel-buffering" => "no", "connection" => "keep-alive", }.freeze
Class Method Summary collapse
-
.headers ⇒ Object
Convenience: the SSE headers to return in the Rack response.
Instance Method Summary collapse
-
#event(data, type: nil, id: nil) ⇒ Object
Emit an SSE event.
-
#finish ⇒ Object
Signal end-of-stream.
Class Method Details
.headers ⇒ Object
Convenience: the SSE headers to return in the Rack response.
71 72 73 |
# File 'lib/a2a/sse/stream.rb', line 71 def self.headers SSE_HEADERS end |
Instance Method Details
#event(data, type: nil, id: nil) ⇒ Object
Emit an SSE event.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/a2a/sse/stream.rb', line 47 def event(data, type: nil, id: nil) payload = data.is_a?(String) ? data : JSON.generate(data) buf = String.new buf << "event: #{type}\n" if type buf << "id: #{id}\n" if id # SSE spec: each line of data gets its own `data:` prefix. # For single-line JSON this is one line; multi-line is handled correctly. payload.each_line do |line| buf << "data: #{line.chomp}\n" end buf << "\n" # blank line terminates the event write(buf) end |
#finish ⇒ Object
Signal end-of-stream. The reader will receive nil on next read(), closing the SSE connection.
66 67 68 |
# File 'lib/a2a/sse/stream.rb', line 66 def finish close_write end |