Class: Quonfig::SSEConfigClient
- Inherits:
-
Object
- Object
- Quonfig::SSEConfigClient
- Defined in:
- lib/quonfig/sse_config_client.rb
Defined Under Namespace
Classes: Options, ReconnectCountingLogger
Constant Summary collapse
- LOG =
Quonfig::InternalLogger.new(self)
Instance Method Summary collapse
- #close ⇒ Object
- #connect(&load_configs) ⇒ Object
-
#count_restart! ⇒ Object
Bump the Layer 1 reconnect counter.
-
#current_cursor ⇒ Object
Compute a Last-Event-ID to resume the stream from.
- #headers ⇒ Object
-
#initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil) ⇒ SSEConfigClient
constructor
on_error: optional callable invoked on every SSE error edge. -
#restart_total ⇒ Object
qfg-ll6r / qfg-ie49: Layer 1 (SSE) restart counter — counts every reconnect, from two sources: 1.
- #source ⇒ Object
- #start(&load_configs) ⇒ Object
Constructor Details
#initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil) ⇒ SSEConfigClient
on_error: optional callable invoked on every SSE error edge. Parent Quonfig::Client wires this to drive @sse_state -> :error so that connection_state reflects the disconnect (qfg-47c2.27). Without it the SDK’s public health primitive would lie about its own state during a mid-run socket drop.
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/quonfig/sse_config_client.rb', line 113 def initialize(, config_loader, = nil, logger = nil, on_error: nil) @prefab_options = @options = || Options.new @config_loader = config_loader @connected = false @logger = logger || LOG @on_error = on_error @restart_total = 0 @restart_mutex = Mutex.new end |
Instance Method Details
#close ⇒ Object
150 151 152 153 |
# File 'lib/quonfig/sse_config_client.rb', line 150 def close @retry_thread&.kill @client&.close end |
#connect(&load_configs) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/quonfig/sse_config_client.rb', line 187 def connect(&load_configs) url = "#{source}/api/v2/sse/config" cursor = current_cursor @logger.debug "SSE Streaming Connect to #{url} start_at #{cursor.inspect}" # Wrap the ld-eventsource logger so internal reconnects (clean FIN, # read-timeout, transient errors) bump restart_total — they never reach # on_error (qfg-ie49). sse_logger = ReconnectCountingLogger.new( Quonfig::InternalLogger.new(SSE::Client) ) { count_restart! } SSE::Client.new(url, headers: headers, read_timeout: @options.sse_read_timeout, reconnect_time: @options.sse_default_reconnect_time, reconnect_reset_interval: @options.sse_reconnect_reset_interval, last_event_id: cursor, logger: sse_logger) do |client| client.on_event do |event| if event.data.nil? || event.data.empty? @logger.error "SSE Streaming Error: Received empty data for url #{url}" client.close next end begin parsed = JSON.parse(event.data) rescue JSON::ParserError => e @logger.error "SSE Streaming Error: Failed to parse JSON for url #{url}: #{e.}" client.close next end envelope = Quonfig::ConfigEnvelope.new( configs: parsed['configs'] || [], meta: parsed['meta'] || {} ) load_configs.call(envelope, event, :sse) end client.on_error do |error| # SSL "unexpected eof" is expected when SSE sessions timeout normally if error.is_a?(OpenSSL::SSL::SSLError) && error..include?('unexpected eof') @logger.debug "SSE Streaming: Connection closed (expected timeout) for url #{url}" else @logger.error "SSE Streaming Error: #{error.inspect} for url #{url}" end # qfg-ie49: restart_total is NOT bumped here. ld-eventsource # auto-reconnects after most non-closing errors, and that reconnect # is already counted via ReconnectCountingLogger; bumping here too # would double-count. For closing errors (HTTP::ConnectionError) the # reconnect is counted in @retry_thread instead. on_error's job is # purely to notify the parent client of the disconnect edge. # Notify the parent client BEFORE deciding whether to close — every # error edge is a disconnect signal as far as @sse_state goes, even # if we let the underlying SSE library handle reconnect itself. # qfg-47c2.27 if @on_error begin @on_error.call(error) rescue StandardError => e @logger.error "SSE on_error callback raised: #{e.inspect}" end end if @options.errors_to_close_connection.any? { |klass| error.is_a?(klass) } @logger.debug "Closing SSE connection for url #{url}" client.close end end end end |
#count_restart! ⇒ Object
Bump the Layer 1 reconnect counter. Called from the ld-eventsource worker thread (via ReconnectCountingLogger) and from @retry_thread.
146 147 148 |
# File 'lib/quonfig/sse_config_client.rb', line 146 def count_restart! @restart_mutex.synchronize { @restart_total += 1 } end |
#current_cursor ⇒ Object
Compute a Last-Event-ID to resume the stream from. Three sources, in priority order:
1. config_loader.version -- string ETag from last HTTP fetch (new path)
2. config_loader.highwater_mark -- legacy numeric cursor
3. nil -- no prior state; stream from HEAD
286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/quonfig/sse_config_client.rb', line 286 def current_cursor if @config_loader.respond_to?(:version) v = @config_loader.version return v if v.is_a?(String) && !v.empty? end if @config_loader.respond_to?(:highwater_mark) hw = @config_loader.highwater_mark return hw.to_s if hw.is_a?(Numeric) && hw.positive? return hw if hw.is_a?(String) && !hw.empty? end nil end |
#headers ⇒ Object
263 264 265 266 267 268 269 270 271 |
# File 'lib/quonfig/sse_config_client.rb', line 263 def headers auth = "1:#{@prefab_options.sdk_key}" auth_string = Base64.strict_encode64(auth) { 'Authorization' => "Basic #{auth_string}", 'Accept' => 'text/event-stream', 'X-Quonfig-SDK-Version' => "ruby-#{Quonfig::VERSION}" } end |
#restart_total ⇒ Object
qfg-ll6r / qfg-ie49: Layer 1 (SSE) restart counter — counts every reconnect, from two sources:
1. ld-eventsource's own internal reconnect (clean FIN, read timeout,
transient errors it doesn't surface) — observed via the
ReconnectCountingLogger "Will retry connection after" signal.
2. SDK-driven reconnects in @retry_thread, after a closing error
(HTTP::ConnectionError) made us close the SSE::Client outright.
These two are mutually exclusive per disconnect, so there is no double-count. on_error is deliberately NOT a source — ld-eventsource reconnects internally after most non-closing errors, so counting the error edge AND the reconnect would double up (qfg-ie49).
The chaos harness pulls this via Client#worker_restart_total(layer: ‘1’) so kill-storm scenarios (e.g. scenario 09 — proxy killed 5x in 30s) can assert restart_total >= 5 even when the kills produce clean FINs that never reach on_error.
140 141 142 |
# File 'lib/quonfig/sse_config_client.rb', line 140 def restart_total @restart_mutex.synchronize { @restart_total } end |
#source ⇒ Object
273 274 275 276 277 278 279 |
# File 'lib/quonfig/sse_config_client.rb', line 273 def source @source_index = @source_index.nil? ? 0 : @source_index + 1 @source_index = 0 if @source_index >= @prefab_options.sse_api_urls.size @prefab_options.sse_api_urls[@source_index] end |
#start(&load_configs) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/quonfig/sse_config_client.rb', line 155 def start(&load_configs) if @prefab_options.sse_api_urls.empty? @logger.debug 'No SSE api_urls configured' return end @client = connect(&load_configs) closed_count = 0 @retry_thread = Thread.new do loop do sleep @options.sleep_delay_for_new_connection_check next unless @client.closed? closed_count += @options.sleep_delay_for_new_connection_check next unless closed_count > @options.seconds_between_new_connection closed_count = 0 @logger.debug 'Reconnecting SSE client' # SDK-driven reconnect: a closing error (HTTP::ConnectionError) # closed the previous SSE::Client, so ld-eventsource's own # reconnect loop has exited and won't emit the "Will retry" signal. # Count it here instead (qfg-ie49). count_restart! @client = connect(&load_configs) end end end |