Class: Feat::StreamingClient
- Inherits:
-
Object
- Object
- Feat::StreamingClient
- Includes:
- InterruptibleSleep
- Defined in:
- lib/feat/streaming.rb
Overview
Holds a long-lived SSE connection to the datafile stream endpoint and invokes on_put with the parsed datafile for every ‘put` frame and on_patch with the parsed delta for every `patch` frame. Runs on its own thread, reconnects with exponential backoff, and stops cleanly.
Constant Summary collapse
- PATH =
"/sdk/v1/datafile/stream".freeze
- DEFAULT_INITIAL_BACKOFF =
1.0- DEFAULT_MAX_BACKOFF =
60.0- DEFAULT_MIN_UPTIME =
A connection must stay open at least this long before we treat it as healthy and reset backoff. The server seeds a ‘put` on every (re)connect, so “an event arrived” alone does not prove the connection is stable: a server that seeds then immediately drops would otherwise pin us to a fixed ~1s reconnect cadence forever.
5.0- TERMINAL_STREAM_CODES =
HTTP statuses that will never succeed with this key/origin, so we stop retrying: 401 (invalid/revoked/expired key), 403 (origin not allowed).
[401, 403].freeze
- JOIN_TIMEOUT_SECONDS =
5
Constants included from InterruptibleSleep
InterruptibleSleep::SLEEP_GRANULARITY
Instance Method Summary collapse
-
#initialize(url:, api_key:, transport:, on_put:, on_patch: nil, on_error: nil, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, min_uptime: DEFAULT_MIN_UPTIME, max_event_bytes: SSEParser::MAX_EVENT_BYTES) ⇒ StreamingClient
constructor
A new instance of StreamingClient.
- #start ⇒ Object
-
#stop ⇒ Object
Signal shutdown, abort any in-flight read, and join the thread.
Constructor Details
#initialize(url:, api_key:, transport:, on_put:, on_patch: nil, on_error: nil, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, min_uptime: DEFAULT_MIN_UPTIME, max_event_bytes: SSEParser::MAX_EVENT_BYTES) ⇒ StreamingClient
Returns a new instance of StreamingClient.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/feat/streaming.rb', line 110 def initialize(url:, api_key:, transport:, on_put:, on_patch: nil, on_error: nil, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, min_uptime: DEFAULT_MIN_UPTIME, max_event_bytes: SSEParser::MAX_EVENT_BYTES) @url = url.chomp("/") @api_key = api_key @transport = transport @on_put = on_put @on_patch = on_patch @on_error = on_error @initial_backoff = initial_backoff @max_backoff = max_backoff @min_uptime = min_uptime @max_event_bytes = max_event_bytes @mutex = Mutex.new @conn = nil @thread = nil @stop = false end |
Instance Method Details
#start ⇒ Object
131 132 133 134 |
# File 'lib/feat/streaming.rb', line 131 def start @thread ||= Thread.new { run_loop } self end |
#stop ⇒ Object
Signal shutdown, abort any in-flight read, and join the thread.
137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/feat/streaming.rb', line 137 def stop @stop = true current = @mutex.synchronize { @conn } begin current&.close rescue StandardError nil end @thread&.join(JOIN_TIMEOUT_SECONDS) @thread = nil self end |