Class: Rubino::API::Operations::Runs::EventsOperation

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/api/operations/runs/events_operation.rb

Overview

GET /v1/runs/:id/events — Server-Sent Events stream.

Replays persisted events (honoring the ‘Last-Event-ID` header for resume), then polls for new ones at POLL_INTERVAL until the run reaches a terminal status (completed/failed/stopped) or disappears. Puma handles the chunked transfer transparently.

Returns:

  • ([Integer, Hash, Enumerable])

    200 + SSE headers + lazy streamer.

Raises:

Constant Summary collapse

TERMINAL_STATUSES =
%w[completed failed stopped].freeze
POLL_INTERVAL =
0.25
HEARTBEAT_INTERVAL =

Proxies (nginx, caddy, ALB) close idle connections around 30–60s; 15s leaves margin and also exercises the write path so we notice client disconnects (EPIPE/ECONNRESET) without waiting for a real event.

15.0
HEARTBEAT_FRAME =
": heartbeat\n\n"
DEFAULT_IDLE_EVENT_TIMEOUT =

Watchdog: if the run is still “running” but no new event has been written for this many seconds, give up and mark it failed. Covers cases the Executor’s rescue can’t (model in an infinite tool loop, provider stream silently stalled, OS thread killed by a signal we never saw). Generous enough to outlast a slow tool call but well under the SSE consumer’s job timeout. Tunable via config so an op can dial it down for short tasks or up for legit long-running computations.

300.0
DISCONNECT_ERRORS =

Writes to a closed/aborted socket surface as one of these; we treat them all as “client gone” and stop polling so the thread doesn’t leak until the run reaches a terminal status.

[Errno::EPIPE, Errno::ECONNRESET, IOError].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(repository: nil, event_store: nil, clock: nil, sleeper: nil, idle_event_timeout: :default) ⇒ EventsOperation

Accepts an alternate run repository and event store for tests. ‘clock` and `sleeper` are seams so heartbeat/disconnect specs can drive virtual time without sleeping in real wall-clock seconds. `idle_event_timeout` overrides the watchdog window (defaults from config so ops can dial without code changes; nil disables it).



48
49
50
51
52
53
54
# File 'lib/rubino/api/operations/runs/events_operation.rb', line 48

def initialize(repository: nil, event_store: nil, clock: nil, sleeper: nil, idle_event_timeout: :default)
  @repository = repository || ::Rubino::Run::Repository.new
  @store = event_store || ::Rubino::Run::EventStore.new
  @clock = clock || -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }
  @sleeper = sleeper || ->(seconds) { sleep(seconds) }
  @idle_event_timeout = idle_event_timeout == :default ? configured_idle_timeout : idle_event_timeout
end

Class Method Details

.call(request) ⇒ Object



39
40
41
# File 'lib/rubino/api/operations/runs/events_operation.rb', line 39

def self.call(request)
  new.call(request)
end

Instance Method Details

#call(request) ⇒ Object

Raises:



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/rubino/api/operations/runs/events_operation.rb', line 56

def call(request)
  id = request.params.fetch("id")
  run = @repository.find(id)
  raise NotFoundError.new("run", id) unless run

  after_seq = parse_last_event_id(request.header("Last-Event-ID"))
  headers = {
    "content-type" => "text/event-stream",
    "cache-control" => "no-cache",
    "x-accel-buffering" => "no"
  }
  [200, headers, build_stream(id, after_seq)]
end