Class: A2A::Middleware::StreamBuilder
- Inherits:
-
Object
- Object
- A2A::Middleware::StreamBuilder
- Defined in:
- lib/a2a/middleware/sse_stream.rb
Overview
Factory that creates the correct SSE stream subclass based on the protocol binding, then runs the caller’s block inside Async with automatic finish on exit.
Created by SSEStream middleware — not intended for direct use.
Instance Method Summary collapse
-
#initialize(env) ⇒ StreamBuilder
constructor
A new instance of StreamBuilder.
-
#open(task_id:, context_id:) {|stream| ... } ⇒ nil
Create and open an SSE stream for the current request.
Constructor Details
#initialize(env) ⇒ StreamBuilder
Returns a new instance of StreamBuilder.
59 60 61 |
# File 'lib/a2a/middleware/sse_stream.rb', line 59 def initialize(env) @env = env end |
Instance Method Details
#open(task_id:, context_id:) {|stream| ... } ⇒ nil
Create and open an SSE stream for the current request.
Detects REST vs JSON-RPC from the env, constructs the correct stream subclass, and yields it to the block. The block runs inside an Async fiber. The stream is automatically finished when the block exits, even if an exception is raised.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/a2a/middleware/sse_stream.rb', line 75 def open(task_id:, context_id:, &block) stream = if @env["a2a.json_rpc_id"] A2A::SSE::JsonRpcStream.new( task_id: task_id, context_id: context_id, json_rpc_id: @env["a2a.json_rpc_id"] ) else A2A::SSE::RestStream.new( task_id: task_id, context_id: context_id ) end @env["a2a.stream"] = stream Async do block.call(stream) ensure stream.finish end nil end |