Class: Feat::StreamingClient

Inherits:
Object
  • Object
show all
Includes:
InterruptibleSleep
Defined in:
lib/feat/streaming.rb

Overview

Holds a long-lived SSE connection to the datafile stream endpoint and invokes on_put with the parsed datafile for every ‘put` frame and on_patch with the parsed delta for every `patch` frame. Runs on its own thread, reconnects with exponential backoff, and stops cleanly.

Constant Summary collapse

PATH =
"/sdk/v1/datafile/stream".freeze
DEFAULT_INITIAL_BACKOFF =
1.0
DEFAULT_MAX_BACKOFF =
60.0
DEFAULT_MIN_UPTIME =

A connection must stay open at least this long before we treat it as healthy and reset backoff. The server seeds a ‘put` on every (re)connect, so “an event arrived” alone does not prove the connection is stable: a server that seeds then immediately drops would otherwise pin us to a fixed ~1s reconnect cadence forever.

5.0
TERMINAL_STREAM_CODES =

HTTP statuses that will never succeed with this key/origin, so we stop retrying: 401 (invalid/revoked/expired key), 403 (origin not allowed).

[401, 403].freeze
JOIN_TIMEOUT_SECONDS =
5

Constants included from InterruptibleSleep

InterruptibleSleep::SLEEP_GRANULARITY

Instance Method Summary collapse

Constructor Details

#initialize(url:, api_key:, transport:, on_put:, on_patch: nil, on_error: nil, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, min_uptime: DEFAULT_MIN_UPTIME, max_event_bytes: SSEParser::MAX_EVENT_BYTES) ⇒ StreamingClient

Returns a new instance of StreamingClient.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/feat/streaming.rb', line 110

def initialize(url:, api_key:, transport:, on_put:, on_patch: nil, on_error: nil,
               initial_backoff: DEFAULT_INITIAL_BACKOFF,
               max_backoff: DEFAULT_MAX_BACKOFF,
               min_uptime: DEFAULT_MIN_UPTIME,
               max_event_bytes: SSEParser::MAX_EVENT_BYTES)
  @url             = url.chomp("/")
  @api_key         = api_key
  @transport       = transport
  @on_put          = on_put
  @on_patch        = on_patch
  @on_error        = on_error
  @initial_backoff = initial_backoff
  @max_backoff     = max_backoff
  @min_uptime      = min_uptime
  @max_event_bytes = max_event_bytes
  @mutex           = Mutex.new
  @conn            = nil
  @thread          = nil
  @stop            = false
end

Instance Method Details

#startObject



131
132
133
134
# File 'lib/feat/streaming.rb', line 131

def start
  @thread ||= Thread.new { run_loop }
  self
end

#stopObject

Signal shutdown, abort any in-flight read, and join the thread.



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/feat/streaming.rb', line 137

def stop
  @stop = true
  current = @mutex.synchronize { @conn }
  begin
    current&.close
  rescue StandardError
    nil
  end
  @thread&.join(JOIN_TIMEOUT_SECONDS)
  @thread = nil
  self
end