Class: A2A::Middleware::SSEStream

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/middleware/sse_stream.rb

Overview

Sets up an SSE stream builder on ‘env`.

The builder detects the protocol binding (REST vs JSON-RPC) from env and creates the correct stream subclass when ‘open` is called. The `open` block runs inside an Async fiber and the stream is automatically finished when the block exits (even on exception).

If the handler never calls ‘open`, the builder is removed from env so the binding layer doesn’t mistake it for a real stream.

Usage:

on "SendStreamingMessage" do
  use A2A::Middleware::SSEStream
  use A2A::Middleware::ExtractMessage
  respond_with -> (env) {
    env["a2a.stream"].open(task_id: "t1", context_id: "c1") do |s|
      s.task(status: { state: "TASK_STATE_WORKING" })
      s.status_update(status: { state: "TASK_STATE_COMPLETED" })
    end
  }
end

Instance Method Summary collapse

Constructor Details

#initialize(app) ⇒ SSEStream

Returns a new instance of SSEStream.



34
35
36
# File 'lib/a2a/middleware/sse_stream.rb', line 34

def initialize(app)
  @app = app
end

Instance Method Details

#call(env) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/a2a/middleware/sse_stream.rb', line 38

def call(env)
  builder = StreamBuilder.new(env)
  env["a2a.stream"] = builder

  result = @app.call(env)

  # If open was never called, clear the builder so the binding
  # layer doesn't mistake it for a real stream.
  env.delete("a2a.stream") if env["a2a.stream"].equal?(builder)

  result
end