Module: Pgbus::Streams::Cursor

Defined in:
lib/pgbus/streams/cursor.rb

Overview

Parses an SSE replay cursor from a Rack request. The cursor is the highest PGMQ msg_id the client has already seen; the streamer replays everything strictly greater on (re)connect.

Two sources, by precedence:

1. Last-Event-ID header (sent automatically by EventSource on reconnect)
2. ?since= query param (sent by the custom element on first connect,
   because EventSource cannot send custom headers on the initial request)

Returns 0 when no cursor is present (i.e. “deliver everything from now”).

Defined Under Namespace

Classes: InvalidCursor

Constant Summary collapse

MAX_MSG_ID =

PGMQ msg_id is BIGINT — strictly within signed 64-bit range.

(2**63) - 1
INTEGER_PATTERN =

Strict integer pattern: optional leading minus, then digits. No plus prefix, no decimal, no whitespace. Negatives are caught by validate! with a clearer error message than “must be numeric”.

/\A-?\d+\z/

Class Method Summary collapse

Class Method Details

.parse(query_since:, last_event_id:) ⇒ Object



27
28
29
30
31
32
33
34
# File 'lib/pgbus/streams/cursor.rb', line 27

def self.parse(query_since:, last_event_id:)
  raw = pick(last_event_id, query_since)
  return 0 if raw.nil?

  value = coerce(raw)
  validate!(value)
  value
end