Class: Exa::Services::AgentRunStream

Inherits:
Object
  • Object
show all
Defined in:
lib/exa/services/agent_run_stream.rb

Constant Summary collapse

TERMINAL_EVENTS =
%w[agent_run.completed agent_run.failed agent_run.cancelled].freeze

Instance Method Summary collapse

Constructor Details

#initialize(connection, **params) ⇒ AgentRunStream

Returns a new instance of AgentRunStream.



12
13
14
15
# File 'lib/exa/services/agent_run_stream.rb', line 12

def initialize(connection, **params)
  @connection = connection
  @params = params
end

Instance Method Details

#call(&block) ⇒ Object

Streams SSE frames to the block as (event_type, data). Returns the final AgentRun built from the terminal event's payload (completed/failed/ cancelled), or nil if the stream ended before a terminal event arrived.

Raises:

  • (ArgumentError)


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/exa/services/agent_run_stream.rb', line 20

def call(&block)
  raise ArgumentError, "block required for streaming" unless block_given?

  @buffer = ""
  @final_payload = nil

  # Capture the terminal payload as it streams, then forward to the caller.
  interceptor = proc do |event_type, data|
    @final_payload = data if TERMINAL_EVENTS.include?(event_type)
    block.call(event_type, data)
  end

  body = ParameterConverter.convert(@params)

  @connection.post("/agent/runs", body) do |req|
    req.headers["Accept"] = "text/event-stream"
    req.options.on_data = proc do |chunk|
      @buffer += chunk
      process_sse_buffer(&interceptor)
    end
  end

  process_remaining_buffer(&interceptor) if @buffer.length.positive?

  @final_payload && Resources::AgentRun.from_response(@final_payload)
end