Class: Exa::Services::AgentRunStream
- Inherits:
-
Object
- Object
- Exa::Services::AgentRunStream
- Defined in:
- lib/exa/services/agent_run_stream.rb
Constant Summary collapse
- TERMINAL_EVENTS =
%w[agent_run.completed agent_run.failed agent_run.cancelled].freeze
Instance Method Summary collapse
-
#call(&block) ⇒ Object
Streams SSE frames to the block as (event_type, data).
-
#initialize(connection, **params) ⇒ AgentRunStream
constructor
A new instance of AgentRunStream.
Constructor Details
#initialize(connection, **params) ⇒ AgentRunStream
Returns a new instance of AgentRunStream.
12 13 14 15 |
# File 'lib/exa/services/agent_run_stream.rb', line 12 def initialize(connection, **params) @connection = connection @params = params end |
Instance Method Details
#call(&block) ⇒ Object
Streams SSE frames to the block as (event_type, data). Returns the final AgentRun built from the terminal event's payload (completed/failed/ cancelled), or nil if the stream ended before a terminal event arrived.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/exa/services/agent_run_stream.rb', line 20 def call(&block) raise ArgumentError, "block required for streaming" unless block_given? @buffer = "" @final_payload = nil # Capture the terminal payload as it streams, then forward to the caller. interceptor = proc do |event_type, data| @final_payload = data if TERMINAL_EVENTS.include?(event_type) block.call(event_type, data) end body = ParameterConverter.convert(@params) @connection.post("/agent/runs", body) do |req| req.headers["Accept"] = "text/event-stream" req..on_data = proc do |chunk| @buffer += chunk process_sse_buffer(&interceptor) end end process_remaining_buffer(&interceptor) if @buffer.length.positive? @final_payload && Resources::AgentRun.from_response(@final_payload) end |