Class: A2A::SSE::EventParser

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/sse/event_parser.rb

Overview

Parses raw SSE text chunks into typed StreamResponse schema objects.

Faraday’s ‘on_data` callback delivers raw text in arbitrary-sized chunks. This parser buffers them, extracts complete SSE events (delimited by blank lines), parses the JSON from `data:` lines, and yields A2A::Schema[“Stream Response”] instances.

For the JSON-RPC binding, the parser unwraps the JSON-RPC 2.0 envelope (‘“jsonrpc”:“2.0”,“id”:N,“result”:{…}`) to extract the StreamResponse payload from `result`.

Usage:

parser = A2A::SSE::EventParser.new(binding: :rest)

parser.feed("data: {\"task\":{\"id\":\"t1\"}}\n\n") do |event|
  event.task.id  #=> "t1"
end

Instance Method Summary collapse

Constructor Details

#initialize(binding:) ⇒ EventParser

Returns a new instance of EventParser.



27
28
29
30
# File 'lib/a2a/sse/event_parser.rb', line 27

def initialize(binding:)
  @binding = binding
  @buffer  = String.new
end

Instance Method Details

#feed(chunk) {|event| ... } ⇒ Object

Feed a raw chunk of SSE text. Yields a StreamResponse for each complete event found in the buffer.

Parameters:

  • chunk (String)

    raw SSE text from the wire

Yield Parameters:



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/a2a/sse/event_parser.rb', line 38

def feed(chunk)
  @buffer << chunk

  # SSE events are terminated by a blank line (\n\n).
  # We split on that boundary, keeping any incomplete trailing data
  # in the buffer for the next call.
  while (idx = @buffer.index("\n\n"))
    raw_event = @buffer.slice!(0, idx + 2)
    payload = parse_event(raw_event)
    yield A2A::Schema["Stream Response"].new(payload) if payload
  end
end