Class: Rubino::API::Operations::Runs::EventsOperation

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/api/operations/runs/events_operation.rb

Overview

GET /v1/runs/:id/events — Server-Sent Events stream.

Replays persisted events (honoring the ‘Last-Event-ID` header for resume), then polls for new ones at POLL_INTERVAL until the run reaches a terminal status (completed/failed/stopped) or disappears. Puma handles the chunked transfer transparently.

Returns:

  • ([Integer, Hash, Enumerable])

    200 + SSE headers + lazy streamer.

Raises:

Constant Summary collapse

TERMINAL_STATUSES =
%w[completed failed stopped].freeze
POLL_INTERVAL =
0.25
HEARTBEAT_INTERVAL =

Proxies (nginx, caddy, ALB) close idle connections around 30–60s; 15s leaves margin and also exercises the write path so we notice client disconnects (EPIPE/ECONNRESET) without waiting for a real event.

15.0
HEARTBEAT_FRAME =
": heartbeat\n\n"
DEFAULT_IDLE_EVENT_TIMEOUT =

Watchdog: if the run is still “running” but no new event has been written for this many seconds, give up and mark it failed. Covers cases the Executor’s rescue can’t (model in an infinite tool loop, provider stream silently stalled, OS thread killed by a signal we never saw). Generous enough to outlast a slow tool call but well under the SSE consumer’s job timeout. Tunable via config so an op can dial it down for short tasks or up for legit long-running computations.

300.0
DISCONNECT_ERRORS =

Writes to a closed/aborted socket surface as one of these; we treat them all as “client gone” and stop polling so the thread doesn’t leak until the run reaches a terminal status.

[Errno::EPIPE, Errno::ECONNRESET, IOError].freeze

Instance Method Summary collapse

Constructor Details

#initialize(repository: nil, event_store: nil, clock: nil, sleeper: nil, idle_event_timeout: :default) ⇒ EventsOperation

Accepts an alternate run repository and event store for tests. ‘clock` and `sleeper` are seams so heartbeat/disconnect specs can drive virtual time without sleeping in real wall-clock seconds. `idle_event_timeout` overrides the watchdog window (defaults from config so ops can dial without code changes; nil disables it).



44
45
46
47
48
49
50
# File 'lib/rubino/api/operations/runs/events_operation.rb', line 44

def initialize(repository: nil, event_store: nil, clock: nil, sleeper: nil, idle_event_timeout: :default)
  @repository = repository || ::Rubino::Run::Repository.new
  @store = event_store || ::Rubino::Run::EventStore.new
  @clock = clock || -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }
  @sleeper = sleeper || ->(seconds) { sleep(seconds) }
  @idle_event_timeout = idle_event_timeout == :default ? configured_idle_timeout : idle_event_timeout
end

Instance Method Details

#call(request) ⇒ Object

Raises:



52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/rubino/api/operations/runs/events_operation.rb', line 52

def call(request)
  id = request.params.fetch("id")
  run = @repository.find(id)
  raise NotFoundError.new("run", id) unless run

  after_seq = parse_last_event_id(request.header("Last-Event-ID"))
  headers = {
    "content-type" => "text/event-stream",
    "cache-control" => "no-cache",
    "x-accel-buffering" => "no"
  }
  [200, headers, build_stream(id, after_seq)]
end