Class: A2A::SSE::Stream
- Inherits:
-
Protocol::HTTP::Body::Writable
- Object
- Protocol::HTTP::Body::Writable
- A2A::SSE::Stream
- Defined in:
- lib/a2a/sse/stream.rb
Overview
Async-native SSE body built on Protocol::HTTP::Body::Writable.
Falcon’s protocol-rack passes Protocol::HTTP::Body::Readable subclasses through untouched — no Enumerable wrapping, no intermediate buffering. This gives us true async streaming with backpressure for free.
The gospel (protocol-http) teaches:
- Writable is a producer/consumer queue body
- write() pushes chunks; read() pops them (blocks when empty)
- close_write signals EOF (reader gets nil)
- Client disconnect raises on next write()
Usage:
stream = A2A::SSE::RestStream.new(task_id: "t1", context_id: "c1")
Async do
stream.task(status: { state: "TASK_STATE_WORKING", timestamp: "..." })
stream.status_update(status: { state: "TASK_STATE_COMPLETED", timestamp: "..." })
stream.finish
end
# Return as Rack body — Falcon streams it natively
[200, { "content-type" => "text/event-stream" }, stream]
Direct Known Subclasses
Constant Summary collapse
- SSE_HEADERS =
{ "content-type" => "text/event-stream", "cache-control" => "no-cache, no-transform", "x-accel-buffering" => "no", "connection" => "keep-alive", }.freeze
Instance Attribute Summary collapse
-
#context_id ⇒ Object
readonly
Returns the value of attribute context_id.
-
#task_id ⇒ Object
readonly
Returns the value of attribute task_id.
Class Method Summary collapse
-
.headers ⇒ Object
Convenience: the SSE headers to return in the Rack response.
Instance Method Summary collapse
-
#event(data, type: nil, id: nil) ⇒ Object
Emit an SSE event.
-
#finish ⇒ Object
Signal end-of-stream.
-
#initialize(task_id:, context_id:, **options) ⇒ Stream
constructor
A new instance of Stream.
Constructor Details
#initialize(task_id:, context_id:, **options) ⇒ Stream
Returns a new instance of Stream.
43 44 45 46 47 |
# File 'lib/a2a/sse/stream.rb', line 43 def initialize(task_id:, context_id:, **) @task_id = task_id @context_id = context_id super(**) end |
Instance Attribute Details
#context_id ⇒ Object (readonly)
Returns the value of attribute context_id.
41 42 43 |
# File 'lib/a2a/sse/stream.rb', line 41 def context_id @context_id end |
#task_id ⇒ Object (readonly)
Returns the value of attribute task_id.
41 42 43 |
# File 'lib/a2a/sse/stream.rb', line 41 def task_id @task_id end |
Class Method Details
.headers ⇒ Object
Convenience: the SSE headers to return in the Rack response.
79 80 81 |
# File 'lib/a2a/sse/stream.rb', line 79 def self.headers SSE_HEADERS end |
Instance Method Details
#event(data, type: nil, id: nil) ⇒ Object
Emit an SSE event.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/a2a/sse/stream.rb', line 55 def event(data, type: nil, id: nil) payload = data.is_a?(String) ? data : JSON.generate(data) buf = String.new buf << "event: #{type}\n" if type buf << "id: #{id}\n" if id # SSE spec: each line of data gets its own `data:` prefix. # For single-line JSON this is one line; multi-line is handled correctly. payload.each_line do |line| buf << "data: #{line.chomp}\n" end buf << "\n" # blank line terminates the event write(buf) end |
#finish ⇒ Object
Signal end-of-stream. The reader will receive nil on next read(), closing the SSE connection.
74 75 76 |
# File 'lib/a2a/sse/stream.rb', line 74 def finish close_write end |