Class: Rubino::API::Operations::Runs::EventsOperation
- Inherits:
-
Object
- Object
- Rubino::API::Operations::Runs::EventsOperation
- Defined in:
- lib/rubino/api/operations/runs/events_operation.rb
Overview
GET /v1/runs/:id/events — Server-Sent Events stream.
Replays persisted events (honoring the ‘Last-Event-ID` header for resume), then polls for new ones at POLL_INTERVAL until the run reaches a terminal status (completed/failed/stopped) or disappears. Puma handles the chunked transfer transparently.
Constant Summary collapse
- TERMINAL_STATUSES =
%w[completed failed stopped].freeze
- POLL_INTERVAL =
0.25- HEARTBEAT_INTERVAL =
Proxies (nginx, caddy, ALB) close idle connections around 30–60s; 15s leaves margin and also exercises the write path so we notice client disconnects (EPIPE/ECONNRESET) without waiting for a real event.
15.0- HEARTBEAT_FRAME =
": heartbeat\n\n"- DEFAULT_IDLE_EVENT_TIMEOUT =
Watchdog: if the run is still “running” but no new event has been written for this many seconds, give up and mark it failed. Covers cases the Executor’s rescue can’t (model in an infinite tool loop, provider stream silently stalled, OS thread killed by a signal we never saw). Generous enough to outlast a slow tool call but well under the SSE consumer’s job timeout. Tunable via config so an op can dial it down for short tasks or up for legit long-running computations.
300.0- DISCONNECT_ERRORS =
Writes to a closed/aborted socket surface as one of these; we treat them all as “client gone” and stop polling so the thread doesn’t leak until the run reaches a terminal status.
[Errno::EPIPE, Errno::ECONNRESET, IOError].freeze
Class Method Summary collapse
Instance Method Summary collapse
- #call(request) ⇒ Object
-
#initialize(repository: nil, event_store: nil, clock: nil, sleeper: nil, idle_event_timeout: :default) ⇒ EventsOperation
constructor
Accepts an alternate run repository and event store for tests.
Constructor Details
#initialize(repository: nil, event_store: nil, clock: nil, sleeper: nil, idle_event_timeout: :default) ⇒ EventsOperation
Accepts an alternate run repository and event store for tests. ‘clock` and `sleeper` are seams so heartbeat/disconnect specs can drive virtual time without sleeping in real wall-clock seconds. `idle_event_timeout` overrides the watchdog window (defaults from config so ops can dial without code changes; nil disables it).
48 49 50 51 52 53 54 |
# File 'lib/rubino/api/operations/runs/events_operation.rb', line 48 def initialize(repository: nil, event_store: nil, clock: nil, sleeper: nil, idle_event_timeout: :default) @repository = repository || ::Rubino::Run::Repository.new @store = event_store || ::Rubino::Run::EventStore.new @clock = clock || -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) } @sleeper = sleeper || ->(seconds) { sleep(seconds) } @idle_event_timeout = idle_event_timeout == :default ? configured_idle_timeout : idle_event_timeout end |
Class Method Details
.call(request) ⇒ Object
39 40 41 |
# File 'lib/rubino/api/operations/runs/events_operation.rb', line 39 def self.call(request) new.call(request) end |
Instance Method Details
#call(request) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/rubino/api/operations/runs/events_operation.rb', line 56 def call(request) id = request.params.fetch("id") run = @repository.find(id) raise NotFoundError.new("run", id) unless run after_seq = parse_last_event_id(request.header("Last-Event-ID")) headers = { "content-type" => "text/event-stream", "cache-control" => "no-cache", "x-accel-buffering" => "no" } [200, headers, build_stream(id, after_seq)] end |