Class: Quonfig::SSEConfigClient

Inherits:
Object
  • Object
show all
Defined in:
lib/quonfig/sse_config_client.rb

Defined Under Namespace

Classes: Options, ReconnectCountingLogger

Constant Summary collapse

LOG =
Quonfig::InternalLogger.new(self)

Instance Method Summary collapse

Constructor Details

#initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil) ⇒ SSEConfigClient

on_error: optional callable invoked on every SSE error edge. Parent Quonfig::Client wires this to drive @sse_state -> :error so that connection_state reflects the disconnect (qfg-47c2.27). Without it the SDK’s public health primitive would lie about its own state during a mid-run socket drop.



113
114
115
116
117
118
119
120
121
122
# File 'lib/quonfig/sse_config_client.rb', line 113

def initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil)
  @prefab_options = prefab_options
  @options = options || Options.new
  @config_loader = config_loader
  @connected = false
  @logger = logger || LOG
  @on_error = on_error
  @restart_total = 0
  @restart_mutex = Mutex.new
end

Instance Method Details

#closeObject



150
151
152
153
# File 'lib/quonfig/sse_config_client.rb', line 150

def close
  @retry_thread&.kill
  @client&.close
end

#connect(&load_configs) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/quonfig/sse_config_client.rb', line 187

def connect(&load_configs)
  url = "#{source}/api/v2/sse/config"
  cursor = current_cursor
  @logger.debug "SSE Streaming Connect to #{url} start_at #{cursor.inspect}"

  # Wrap the ld-eventsource logger so internal reconnects (clean FIN,
  # read-timeout, transient errors) bump restart_total — they never reach
  # on_error (qfg-ie49).
  sse_logger = ReconnectCountingLogger.new(
    Quonfig::InternalLogger.new(SSE::Client)
  ) { count_restart! }

  SSE::Client.new(url,
                  headers: headers,
                  read_timeout: @options.sse_read_timeout,
                  reconnect_time: @options.sse_default_reconnect_time,
                  reconnect_reset_interval: @options.sse_reconnect_reset_interval,
                  last_event_id: cursor,
                  logger: sse_logger) do |client|
    client.on_event do |event|
      if event.data.nil? || event.data.empty?
        @logger.error "SSE Streaming Error: Received empty data for url #{url}"
        client.close
        next
      end

      begin
        parsed = JSON.parse(event.data)
      rescue JSON::ParserError => e
        @logger.error "SSE Streaming Error: Failed to parse JSON for url #{url}: #{e.message}"
        client.close
        next
      end

      envelope = Quonfig::ConfigEnvelope.new(
        configs: parsed['configs'] || [],
        meta: parsed['meta'] || {}
      )
      load_configs.call(envelope, event, :sse)
    end

    client.on_error do |error|
      # SSL "unexpected eof" is expected when SSE sessions timeout normally
      if error.is_a?(OpenSSL::SSL::SSLError) && error.message.include?('unexpected eof')
        @logger.debug "SSE Streaming: Connection closed (expected timeout) for url #{url}"
      else
        @logger.error "SSE Streaming Error: #{error.inspect} for url #{url}"
      end

      # qfg-ie49: restart_total is NOT bumped here. ld-eventsource
      # auto-reconnects after most non-closing errors, and that reconnect
      # is already counted via ReconnectCountingLogger; bumping here too
      # would double-count. For closing errors (HTTP::ConnectionError) the
      # reconnect is counted in @retry_thread instead. on_error's job is
      # purely to notify the parent client of the disconnect edge.

      # Notify the parent client BEFORE deciding whether to close — every
      # error edge is a disconnect signal as far as @sse_state goes, even
      # if we let the underlying SSE library handle reconnect itself.
      # qfg-47c2.27
      if @on_error
        begin
          @on_error.call(error)
        rescue StandardError => e
          @logger.error "SSE on_error callback raised: #{e.inspect}"
        end
      end

      if @options.errors_to_close_connection.any? { |klass| error.is_a?(klass) }
        @logger.debug "Closing SSE connection for url #{url}"
        client.close
      end
    end
  end
end

#count_restart!Object

Bump the Layer 1 reconnect counter. Called from the ld-eventsource worker thread (via ReconnectCountingLogger) and from @retry_thread.



146
147
148
# File 'lib/quonfig/sse_config_client.rb', line 146

def count_restart!
  @restart_mutex.synchronize { @restart_total += 1 }
end

#current_cursorObject

Compute a Last-Event-ID to resume the stream from. Three sources, in priority order:

1. config_loader.version  -- string ETag from last HTTP fetch (new path)
2. config_loader.highwater_mark -- legacy numeric cursor
3. nil -- no prior state; stream from HEAD


286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/quonfig/sse_config_client.rb', line 286

def current_cursor
  if @config_loader.respond_to?(:version)
    v = @config_loader.version
    return v if v.is_a?(String) && !v.empty?
  end

  if @config_loader.respond_to?(:highwater_mark)
    hw = @config_loader.highwater_mark
    return hw.to_s if hw.is_a?(Numeric) && hw.positive?
    return hw if hw.is_a?(String) && !hw.empty?
  end

  nil
end

#headersObject



263
264
265
266
267
268
269
270
271
# File 'lib/quonfig/sse_config_client.rb', line 263

def headers
  auth = "1:#{@prefab_options.sdk_key}"
  auth_string = Base64.strict_encode64(auth)
  {
    'Authorization' => "Basic #{auth_string}",
    'Accept' => 'text/event-stream',
    'X-Quonfig-SDK-Version' => "ruby-#{Quonfig::VERSION}"
  }
end

#restart_totalObject

qfg-ll6r / qfg-ie49: Layer 1 (SSE) restart counter — counts every reconnect, from two sources:

1. ld-eventsource's own internal reconnect (clean FIN, read timeout,
   transient errors it doesn't surface) — observed via the
   ReconnectCountingLogger "Will retry connection after" signal.
2. SDK-driven reconnects in @retry_thread, after a closing error
   (HTTP::ConnectionError) made us close the SSE::Client outright.

These two are mutually exclusive per disconnect, so there is no double-count. on_error is deliberately NOT a source — ld-eventsource reconnects internally after most non-closing errors, so counting the error edge AND the reconnect would double up (qfg-ie49).

The chaos harness pulls this via Client#worker_restart_total(layer: ‘1’) so kill-storm scenarios (e.g. scenario 09 — proxy killed 5x in 30s) can assert restart_total >= 5 even when the kills produce clean FINs that never reach on_error.



140
141
142
# File 'lib/quonfig/sse_config_client.rb', line 140

def restart_total
  @restart_mutex.synchronize { @restart_total }
end

#sourceObject



273
274
275
276
277
278
279
# File 'lib/quonfig/sse_config_client.rb', line 273

def source
  @source_index = @source_index.nil? ? 0 : @source_index + 1

  @source_index = 0 if @source_index >= @prefab_options.sse_api_urls.size

  @prefab_options.sse_api_urls[@source_index]
end

#start(&load_configs) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/quonfig/sse_config_client.rb', line 155

def start(&load_configs)
  if @prefab_options.sse_api_urls.empty?
    @logger.debug 'No SSE api_urls configured'
    return
  end

  @client = connect(&load_configs)

  closed_count = 0

  @retry_thread = Thread.new do
    loop do
      sleep @options.sleep_delay_for_new_connection_check

      next unless @client.closed?

      closed_count += @options.sleep_delay_for_new_connection_check

      next unless closed_count > @options.seconds_between_new_connection

      closed_count = 0
      @logger.debug 'Reconnecting SSE client'
      # SDK-driven reconnect: a closing error (HTTP::ConnectionError)
      # closed the previous SSE::Client, so ld-eventsource's own
      # reconnect loop has exited and won't emit the "Will retry" signal.
      # Count it here instead (qfg-ie49).
      count_restart!
      @client = connect(&load_configs)
    end
  end
end