Class: Quonfig::SSEConfigClient
- Inherits:
-
Object
- Object
- Quonfig::SSEConfigClient
- Defined in:
- lib/quonfig/sse_config_client.rb
Overview
SSE client for real-time config delivery from api-delivery-sse.
Owns its reconnect loop end-to-end. sdk-go, sdk-python, and sdk-node all reached the same conclusion: the wire format we consume (plain JSON envelopes in single-line data: frames, no named events, no retry directives) is simple enough that an SDK-owned loop is clearer than a library wrapper, and the operator-facing reconnect counter becomes trivially correct because there is exactly one place that increments it (qfg-35sm; replaces the ld-eventsource integration from qfg-ie49 + qfg-cf52, which required log-line scraping and a raise-proof logger wrapper to observe reconnects through the upstream library).
Defined Under Namespace
Classes: EventParser, LineReader, Options, ReadDeadlineWatchdog, SSEHTTPStatusError, SSEHTTPTerminalError, SSEReadDeadlineExceeded
Constant Summary collapse
- LOG =
Quonfig::InternalLogger.new(self)
- TERMINAL_HTTP_CODES =
qfg-i5xv: HTTP status codes the SDK classifies as terminal — these will not heal by retrying (bad key, revoked permission, missing endpoint). Anything else (5xx, 429, network errors) stays on the transient path.
[401, 403, 404].freeze
Instance Method Summary collapse
-
#close ⇒ Object
Shut down.
-
#current_cursor ⇒ Object
Compute a Last-Event-ID for the next request.
-
#headers ⇒ Object
Public so tests can assert the headers shape.
-
#initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil) ⇒ SSEConfigClient
constructor
on_error: optional callable invoked on every SSE error edge. -
#on_envelope_error_total ⇒ Object
qfg-m3lk: count of user-supplied on_envelope callback invocations that raised.
-
#restart_total ⇒ Object
Layer 1 (SSE) reconnect counter.
- #start(&on_envelope) ⇒ Object
Constructor Details
#initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil) ⇒ SSEConfigClient
on_error: optional callable invoked on every SSE error edge. Parent Quonfig::Client wires this to drive @sse_state -> :error so that connection_state reflects the disconnect (qfg-47c2.27).
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/quonfig/sse_config_client.rb', line 62 def initialize(, config_loader, = nil, logger = nil, on_error: nil) @prefab_options = @options = || Options.new @config_loader = config_loader @logger = logger || LOG @on_error = on_error @stopped = Concurrent::AtomicBoolean.new(false) @restart_total = 0 @restart_mutex = Mutex.new @on_envelope_error_total = 0 @on_envelope_error_mutex = Mutex.new @conn_mutex = Mutex.new @active_http = nil @source_index = -1 @last_event_id = nil end |
Instance Method Details
#close ⇒ Object
Shut down. Interrupts the in-flight stream by closing the underlying socket from this thread — the worker thread observes the resulting IOError, sees @stopped == true, and exits cleanly.
109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/quonfig/sse_config_client.rb', line 109 def close @stopped.make_true @conn_mutex.synchronize do begin @active_http&.finish rescue StandardError # already closed / never started — idempotent end @active_http = nil end @worker&.join(2) @worker = nil end |
#current_cursor ⇒ Object
Compute a Last-Event-ID for the next request. Three sources, in priority order:
1. @last_event_id -- set by the most recent event we processed
2. config_loader.version -- string ETag from last HTTP fetch
3. config_loader.highwater_mark -- legacy numeric cursor
Returns nil if no prior state exists.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/quonfig/sse_config_client.rb', line 145 def current_cursor return @last_event_id if @last_event_id && !@last_event_id.empty? if @config_loader.respond_to?(:version) v = @config_loader.version return v if v.is_a?(String) && !v.empty? end if @config_loader.respond_to?(:highwater_mark) hw = @config_loader.highwater_mark return hw.to_s if hw.is_a?(Numeric) && hw.positive? return hw if hw.is_a?(String) && !hw.empty? end nil end |
#headers ⇒ Object
Public so tests can assert the headers shape. Body of the request is always empty; this is the full set api-delivery-sse sees.
125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/quonfig/sse_config_client.rb', line 125 def headers auth = "1:#{@prefab_options.sdk_key}" auth_string = Base64.strict_encode64(auth) h = { 'Authorization' => "Basic #{auth_string}", 'Accept' => 'text/event-stream', 'Cache-Control' => 'no-cache', 'X-Quonfig-SDK-Version' => "ruby-#{Quonfig::VERSION}" } cursor = current_cursor h['Last-Event-Id'] = cursor if cursor h end |
#on_envelope_error_total ⇒ Object
qfg-m3lk: count of user-supplied on_envelope callback invocations that raised. Surfaced for operator visibility — a non-zero value here with restart_total stable means a caller-side listener bug, not a transport problem. (Pre-fix, those raises propagated into run_loop’s rescue and masqueraded as transport errors, causing reconnect storms.)
96 97 98 |
# File 'lib/quonfig/sse_config_client.rb', line 96 def on_envelope_error_total @on_envelope_error_mutex.synchronize { @on_envelope_error_total } end |
#restart_total ⇒ Object
Layer 1 (SSE) reconnect counter. Bumped exactly once per reconnect attempt — never per error edge, never per envelope. Read by Quonfig::Client#worker_restart_total(layer: ‘1’) and asserted by chaos scenario 09 (>= 5 after 5 proxy flaps in 30s).
87 88 89 |
# File 'lib/quonfig/sse_config_client.rb', line 87 def restart_total @restart_mutex.synchronize { @restart_total } end |
#start(&on_envelope) ⇒ Object
100 101 102 103 104 |
# File 'lib/quonfig/sse_config_client.rb', line 100 def start(&on_envelope) return if @prefab_options.sse_api_urls.nil? || @prefab_options.sse_api_urls.empty? @worker = Thread.new { run_loop(&on_envelope) } end |