Class: Igniter::Store::HTTPAdapter::SseEventsHandler

Inherits:
Object
  • Object
show all
Includes:
ResponseHelper
Defined in:
lib/igniter/store/http_adapter.rb

Overview

GET /v1/events — Server-Sent Events transport over ChangefeedBuffer.

Protocol:

  1. Replay retained events (catch-up).

  2. Subscribe for live events.

Cursor input (both optional):

Last-Event-ID: N  →  replay after sequence N (browser auto-reconnect)
?cursor=N         →  same, for simple clients / tests

Store filtering (optional):

?store=tasks             →  single store
?stores=tasks,reminders  →  multiple stores
(none)                   →  all stores (wildcard)

Error: when the requested cursor is too old (gap due to ring overflow), returns 409 JSON instead of starting the stream.

Instance Method Summary collapse

Constructor Details

#initialize(changefeed_provider:) ⇒ SseEventsHandler

Returns a new instance of SseEventsHandler.



213
214
215
# File 'lib/igniter/store/http_adapter.rb', line 213

def initialize(changefeed_provider:)
  @changefeed_provider = changefeed_provider
end

Instance Method Details

#call(env) ⇒ Object



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/igniter/store/http_adapter.rb', line 217

def call(env)
  return method_not_allowed unless env["REQUEST_METHOD"] == "GET"

  buf = @changefeed_provider&.call
  return json_response(503, { error: "SSE events endpoint not configured" }) unless buf

  cursor = parse_sse_cursor(env)
  stores = parse_sse_stores(env)

  replay_result = buf.replay(
    cursor: cursor,
    stores: stores.empty? ? nil : stores
  )

  if replay_result[:status] == :cursor_too_old
    return json_response(409, {
      status:        "cursor_too_old",
      oldest_cursor: replay_result[:oldest_cursor],
      newest_cursor: replay_result[:newest_cursor],
      dropped_total: replay_result[:dropped_total]
    })
  end

  body = SseBody.new(buf, replay_result[:events], stores)
  [200, sse_headers, body]
end