Class: Quonfig::SSEConfigClient

Inherits:
Object
  • Object
show all
Defined in:
lib/quonfig/sse_config_client.rb

Overview

SSE client for real-time config delivery from api-delivery-sse.

Owns its reconnect loop end-to-end. sdk-go, sdk-python, and sdk-node all reached the same conclusion: the wire format we consume (plain JSON envelopes in single-line data: frames, no named events, no retry directives) is simple enough that an SDK-owned loop is clearer than a library wrapper, and the operator-facing reconnect counter becomes trivially correct because there is exactly one place that increments it (qfg-35sm; replaces the ld-eventsource integration from qfg-ie49 + qfg-cf52, which required log-line scraping and a raise-proof logger wrapper to observe reconnects through the upstream library).

Defined Under Namespace

Classes: EventParser, LineReader, Options, ReadDeadlineWatchdog, SSEHTTPStatusError, SSEHTTPTerminalError, SSEReadDeadlineExceeded

Constant Summary collapse

LOG =
Quonfig::InternalLogger.new(self)
TERMINAL_HTTP_CODES =

qfg-i5xv: HTTP status codes the SDK classifies as terminal — these will not heal by retrying (bad key, revoked permission, missing endpoint). Anything else (5xx, 429, network errors) stays on the transient path.

[401, 403, 404].freeze
READ_TIMEOUT_HEADROOM =

qfg-6y44: headroom added to sse_read_timeout when configuring Net::HTTP#read_timeout. The ReadDeadlineWatchdog already covers both the header and body reads at exactly sse_read_timeout; Net’s own read_timeout is only a redundant backstop. Arming it at the same value as the watchdog makes the two race — and on the body path Net’s (unreliable) timeout can fire first, surfacing a Net::ReadTimeout instead of the SSEReadDeadlineExceeded the SDK is instrumented around. Giving Net’s read_timeout this much headroom keeps it a backstop without ever letting it win the race.

30

Instance Method Summary collapse

Constructor Details

#initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil) ⇒ SSEConfigClient

on_error: optional callable invoked on every SSE error edge. Parent Quonfig::Client wires this to drive @sse_state -> :error so that connection_state reflects the disconnect (qfg-47c2.27).



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/quonfig/sse_config_client.rb', line 73

def initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil)
  @prefab_options = prefab_options
  @options = options || Options.new
  @config_loader = config_loader
  @logger = logger || LOG
  @on_error = on_error

  @stopped = Concurrent::AtomicBoolean.new(false)
  @restart_total = 0
  @restart_mutex = Mutex.new

  @on_envelope_error_total = 0
  @on_envelope_error_mutex = Mutex.new

  @conn_mutex = Mutex.new
  @active_http = nil

  @source_index = -1
  @last_event_id = nil
end

Instance Method Details

#closeObject

Shut down. Interrupts the in-flight stream by closing the underlying socket from this thread — the worker thread observes the resulting IOError, sees @stopped == true, and exits cleanly.



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/quonfig/sse_config_client.rb', line 120

def close
  @stopped.make_true
  @conn_mutex.synchronize do
    begin
      @active_http&.finish
    rescue StandardError
      # already closed / never started — idempotent
    end
    @active_http = nil
  end
  @worker&.join(2)
  @worker = nil
end

#current_cursorObject

Compute a Last-Event-ID for the next request. Three sources, in priority order:

1. @last_event_id  -- set by the most recent event we processed
2. config_loader.version  -- string ETag from last HTTP fetch
3. config_loader.highwater_mark  -- legacy numeric cursor

Returns nil if no prior state exists.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/quonfig/sse_config_client.rb', line 156

def current_cursor
  return @last_event_id if @last_event_id && !@last_event_id.empty?

  if @config_loader.respond_to?(:version)
    v = @config_loader.version
    return v if v.is_a?(String) && !v.empty?
  end

  if @config_loader.respond_to?(:highwater_mark)
    hw = @config_loader.highwater_mark
    return hw.to_s if hw.is_a?(Numeric) && hw.positive?
    return hw if hw.is_a?(String) && !hw.empty?
  end

  nil
end

#headersObject

Public so tests can assert the headers shape. Body of the request is always empty; this is the full set api-delivery-sse sees.



136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/quonfig/sse_config_client.rb', line 136

def headers
  auth = "1:#{@prefab_options.sdk_key}"
  auth_string = Base64.strict_encode64(auth)
  h = {
    'Authorization' => "Basic #{auth_string}",
    'Accept' => 'text/event-stream',
    'Cache-Control' => 'no-cache',
    'X-Quonfig-SDK-Version' => "ruby-#{Quonfig::VERSION}"
  }
  cursor = current_cursor
  h['Last-Event-Id'] = cursor if cursor
  h
end

#on_envelope_error_totalObject

qfg-m3lk: count of user-supplied on_envelope callback invocations that raised. Surfaced for operator visibility — a non-zero value here with restart_total stable means a caller-side listener bug, not a transport problem. (Pre-fix, those raises propagated into run_loop’s rescue and masqueraded as transport errors, causing reconnect storms.)



107
108
109
# File 'lib/quonfig/sse_config_client.rb', line 107

def on_envelope_error_total
  @on_envelope_error_mutex.synchronize { @on_envelope_error_total }
end

#restart_totalObject

Layer 1 (SSE) reconnect counter. Bumped exactly once per reconnect attempt — never per error edge, never per envelope. Read by Quonfig::Client#worker_restart_total(layer: ‘1’) and asserted by chaos scenario 09 (>= 5 after 5 proxy flaps in 30s).



98
99
100
# File 'lib/quonfig/sse_config_client.rb', line 98

def restart_total
  @restart_mutex.synchronize { @restart_total }
end

#start(&on_envelope) ⇒ Object



111
112
113
114
115
# File 'lib/quonfig/sse_config_client.rb', line 111

def start(&on_envelope)
  return if @prefab_options.sse_api_urls.nil? || @prefab_options.sse_api_urls.empty?

  @worker = Thread.new { run_loop(&on_envelope) }
end