Class: A2A::SSE::Stream

Inherits:
Protocol::HTTP::Body::Writable
  • Object
show all
Defined in:
lib/a2a/sse/stream.rb

Overview

Async-native SSE body built on Protocol::HTTP::Body::Writable.

Falcon’s protocol-rack passes Protocol::HTTP::Body::Readable subclasses through untouched — no Enumerable wrapping, no intermediate buffering. This gives us true async streaming with backpressure for free.

The gospel (protocol-http) teaches:

- Writable is a producer/consumer queue body
- write() pushes chunks; read() pops them (blocks when empty)
- close_write signals EOF (reader gets nil)
- Client disconnect raises on next write()

Usage:

stream = A2A::SSE::Stream.new

Async do
  stream.event({ "task" => { ... } })
  stream.event({ "statusUpdate" => { ... } })
  stream.finish
end

# Return as Rack body — Falcon streams it natively
[200, { "content-type" => "text/event-stream" }, stream]

Direct Known Subclasses

JsonRpcStream, RestStream

Constant Summary collapse

SSE_HEADERS =
{
  "content-type"      => "text/event-stream",
  "cache-control"     => "no-cache, no-transform",
  "x-accel-buffering" => "no",
  "connection"        => "keep-alive",
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.headersObject

Convenience: the SSE headers to return in the Rack response.



71
72
73
# File 'lib/a2a/sse/stream.rb', line 71

def self.headers
  SSE_HEADERS
end

Instance Method Details

#event(data, type: nil, id: nil) ⇒ Object

Emit an SSE event.

Parameters:

  • data (Hash, String)

    the event payload (Hashes are JSON-encoded)

  • type (String, nil) (defaults to: nil)

    optional SSE event type field

  • id (String, nil) (defaults to: nil)

    optional SSE event id field



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/a2a/sse/stream.rb', line 47

def event(data, type: nil, id: nil)
  payload = data.is_a?(String) ? data : JSON.generate(data)

  buf = String.new
  buf << "event: #{type}\n" if type
  buf << "id: #{id}\n" if id

  # SSE spec: each line of data gets its own `data:` prefix.
  # For single-line JSON this is one line; multi-line is handled correctly.
  payload.each_line do |line|
    buf << "data: #{line.chomp}\n"
  end
  buf << "\n" # blank line terminates the event

  write(buf)
end

#finishObject

Signal end-of-stream. The reader will receive nil on next read(), closing the SSE connection.



66
67
68
# File 'lib/a2a/sse/stream.rb', line 66

def finish
  close_write
end