Class: A2A::SSE::Stream

Inherits:
Protocol::HTTP::Body::Writable
  • Object
show all
Defined in:
lib/a2a/sse/stream.rb

Overview

Async-native SSE body built on Protocol::HTTP::Body::Writable.

Falcon’s protocol-rack passes Protocol::HTTP::Body::Readable subclasses through untouched — no Enumerable wrapping, no intermediate buffering. This gives us true async streaming with backpressure for free.

The gospel (protocol-http) teaches:

- Writable is a producer/consumer queue body
- write() pushes chunks; read() pops them (blocks when empty)
- close_write signals EOF (reader gets nil)
- Client disconnect raises on next write()

Usage:

stream = A2A::SSE::RestStream.new(task_id: "t1", context_id: "c1")

Async do
  stream.task(status: { state: "TASK_STATE_WORKING", timestamp: "..." })
  stream.status_update(status: { state: "TASK_STATE_COMPLETED", timestamp: "..." })
  stream.finish
end

# Return as Rack body — Falcon streams it natively
[200, { "content-type" => "text/event-stream" }, stream]

Direct Known Subclasses

JsonRpcStream, RestStream

Constant Summary collapse

SSE_HEADERS =
{
  "content-type"      => "text/event-stream",
  "cache-control"     => "no-cache, no-transform",
  "x-accel-buffering" => "no",
  "connection"        => "keep-alive",
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task_id:, context_id:, **options) ⇒ Stream

Returns a new instance of Stream.



43
44
45
46
47
# File 'lib/a2a/sse/stream.rb', line 43

def initialize(task_id:, context_id:, **options)
  @task_id    = task_id
  @context_id = context_id
  super(**options)
end

Instance Attribute Details

#context_idObject (readonly)

Returns the value of attribute context_id.



41
42
43
# File 'lib/a2a/sse/stream.rb', line 41

def context_id
  @context_id
end

#task_idObject (readonly)

Returns the value of attribute task_id.



41
42
43
# File 'lib/a2a/sse/stream.rb', line 41

def task_id
  @task_id
end

Class Method Details

.headersObject

Convenience: the SSE headers to return in the Rack response.



79
80
81
# File 'lib/a2a/sse/stream.rb', line 79

def self.headers
  SSE_HEADERS
end

Instance Method Details

#event(data, type: nil, id: nil) ⇒ Object

Emit an SSE event.

Parameters:

  • data (Hash, String)

    the event payload (Hashes are JSON-encoded)

  • type (String, nil) (defaults to: nil)

    optional SSE event type field

  • id (String, nil) (defaults to: nil)

    optional SSE event id field



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/a2a/sse/stream.rb', line 55

def event(data, type: nil, id: nil)
  payload = data.is_a?(String) ? data : JSON.generate(data)

  buf = String.new
  buf << "event: #{type}\n" if type
  buf << "id: #{id}\n" if id

  # SSE spec: each line of data gets its own `data:` prefix.
  # For single-line JSON this is one line; multi-line is handled correctly.
  payload.each_line do |line|
    buf << "data: #{line.chomp}\n"
  end
  buf << "\n" # blank line terminates the event

  write(buf)
end

#finishObject

Signal end-of-stream. The reader will receive nil on next read(), closing the SSE connection.



74
75
76
# File 'lib/a2a/sse/stream.rb', line 74

def finish
  close_write
end