Class: A2A::Middleware::StreamBuilder

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/middleware/sse_stream.rb

Overview

Factory that creates the correct SSE stream subclass based on the protocol binding, then runs the caller’s block inside Async with automatic finish on exit.

Created by SSEStream middleware — not intended for direct use.

Instance Method Summary collapse

Constructor Details

#initialize(env) ⇒ StreamBuilder

Returns a new instance of StreamBuilder.



59
60
61
# File 'lib/a2a/middleware/sse_stream.rb', line 59

def initialize(env)
  @env = env
end

Instance Method Details

#open(task_id:, context_id:) {|stream| ... } ⇒ nil

Create and open an SSE stream for the current request.

Detects REST vs JSON-RPC from the env, constructs the correct stream subclass, and yields it to the block. The block runs inside an Async fiber. The stream is automatically finished when the block exits, even if an exception is raised.

Parameters:

  • task_id (String)

    the task identifier for this stream

  • context_id (String)

    the context identifier for this stream

Yield Parameters:

Returns:

  • (nil)


75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/a2a/middleware/sse_stream.rb', line 75

def open(task_id:, context_id:, &block)
  stream = if @env["a2a.json_rpc_id"]
    A2A::SSE::JsonRpcStream.new(
      task_id: task_id, context_id: context_id,
      json_rpc_id: @env["a2a.json_rpc_id"]
    )
  else
    A2A::SSE::RestStream.new(
      task_id: task_id, context_id: context_id
    )
  end

  @env["a2a.stream"] = stream

  Async do
    block.call(stream)
  ensure
    stream.finish
  end

  nil
end