Class: Feat::StreamingClient
- Inherits:
-
Object
- Object
- Feat::StreamingClient
- Includes:
- InterruptibleSleep
- Defined in:
- lib/feat/streaming.rb
Overview
Holds a long-lived SSE connection to the datafile stream endpoint and invokes on_put with the parsed datafile for every ‘put` frame. Runs on its own thread, reconnects with exponential backoff, and stops cleanly.
Constant Summary collapse
- PATH =
"/sdk/v1/datafile/stream".freeze
- DEFAULT_INITIAL_BACKOFF =
1.0- DEFAULT_MAX_BACKOFF =
60.0- DEFAULT_MIN_UPTIME =
A connection must stay open at least this long before we treat it as healthy and reset backoff. The server seeds a ‘put` on every (re)connect, so “an event arrived” alone does not prove the connection is stable: a server that seeds then immediately drops would otherwise pin us to a fixed ~1s reconnect cadence forever.
5.0- TERMINAL_STREAM_CODES =
HTTP statuses that will never succeed with this key/origin, so we stop retrying: 401 (invalid/revoked/expired key), 403 (origin not allowed).
[401, 403].freeze
- JOIN_TIMEOUT_SECONDS =
5
Constants included from InterruptibleSleep
InterruptibleSleep::SLEEP_GRANULARITY
Instance Method Summary collapse
-
#initialize(url:, api_key:, transport:, on_put:, on_error: nil, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, min_uptime: DEFAULT_MIN_UPTIME, max_event_bytes: SSEParser::MAX_EVENT_BYTES) ⇒ StreamingClient
constructor
A new instance of StreamingClient.
- #start ⇒ Object
-
#stop ⇒ Object
Signal shutdown, abort any in-flight read, and join the thread.
Constructor Details
#initialize(url:, api_key:, transport:, on_put:, on_error: nil, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, min_uptime: DEFAULT_MIN_UPTIME, max_event_bytes: SSEParser::MAX_EVENT_BYTES) ⇒ StreamingClient
Returns a new instance of StreamingClient.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/feat/streaming.rb', line 109 def initialize(url:, api_key:, transport:, on_put:, on_error: nil, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, min_uptime: DEFAULT_MIN_UPTIME, max_event_bytes: SSEParser::MAX_EVENT_BYTES) @url = url.chomp("/") @api_key = api_key @transport = transport @on_put = on_put @on_error = on_error @initial_backoff = initial_backoff @max_backoff = max_backoff @min_uptime = min_uptime @max_event_bytes = max_event_bytes @mutex = Mutex.new @conn = nil @thread = nil @stop = false end |
Instance Method Details
#start ⇒ Object
129 130 131 132 |
# File 'lib/feat/streaming.rb', line 129 def start @thread ||= Thread.new { run_loop } self end |
#stop ⇒ Object
Signal shutdown, abort any in-flight read, and join the thread.
135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/feat/streaming.rb', line 135 def stop @stop = true current = @mutex.synchronize { @conn } begin current&.close rescue StandardError nil end @thread&.join(JOIN_TIMEOUT_SECONDS) @thread = nil self end |