Class: Quonfig::SSEConfigClient

Inherits:
Object
  • Object
show all
Defined in:
lib/quonfig/sse_config_client.rb

Overview

SSE client for real-time config delivery from api-delivery-sse.

Owns its reconnect loop end-to-end. sdk-go, sdk-python, and sdk-node all reached the same conclusion: the wire format we consume (plain JSON envelopes in single-line data: frames, no named events, no retry directives) is simple enough that an SDK-owned loop is clearer than a library wrapper, and the operator-facing reconnect counter becomes trivially correct because there is exactly one place that increments it (qfg-35sm; replaces the ld-eventsource integration from qfg-ie49 + qfg-cf52, which required log-line scraping and a raise-proof logger wrapper to observe reconnects through the upstream library).

Defined Under Namespace

Classes: EventParser, LineReader, Options, ReadDeadlineWatchdog, SSEHTTPStatusError, SSEHTTPTerminalError, SSEReadDeadlineExceeded

Constant Summary collapse

LOG =
Quonfig::InternalLogger.new(self)
TERMINAL_HTTP_CODES =

qfg-i5xv: HTTP status codes the SDK classifies as terminal — these will not heal by retrying (bad key, revoked permission, missing endpoint). Anything else (5xx, 429, network errors) stays on the transient path.

[401, 403, 404].freeze

Instance Method Summary collapse

Constructor Details

#initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil) ⇒ SSEConfigClient

on_error: optional callable invoked on every SSE error edge. Parent Quonfig::Client wires this to drive @sse_state -> :error so that connection_state reflects the disconnect (qfg-47c2.27).



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/quonfig/sse_config_client.rb', line 62

def initialize(prefab_options, config_loader, options = nil, logger = nil, on_error: nil)
  @prefab_options = prefab_options
  @options = options || Options.new
  @config_loader = config_loader
  @logger = logger || LOG
  @on_error = on_error

  @stopped = Concurrent::AtomicBoolean.new(false)
  @restart_total = 0
  @restart_mutex = Mutex.new

  @on_envelope_error_total = 0
  @on_envelope_error_mutex = Mutex.new

  @conn_mutex = Mutex.new
  @active_http = nil

  @source_index = -1
  @last_event_id = nil
end

Instance Method Details

#closeObject

Shut down. Interrupts the in-flight stream by closing the underlying socket from this thread — the worker thread observes the resulting IOError, sees @stopped == true, and exits cleanly.



109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/quonfig/sse_config_client.rb', line 109

def close
  @stopped.make_true
  @conn_mutex.synchronize do
    begin
      @active_http&.finish
    rescue StandardError
      # already closed / never started — idempotent
    end
    @active_http = nil
  end
  @worker&.join(2)
  @worker = nil
end

#current_cursorObject

Compute a Last-Event-ID for the next request. Three sources, in priority order:

1. @last_event_id  -- set by the most recent event we processed
2. config_loader.version  -- string ETag from last HTTP fetch
3. config_loader.highwater_mark  -- legacy numeric cursor

Returns nil if no prior state exists.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/quonfig/sse_config_client.rb', line 145

def current_cursor
  return @last_event_id if @last_event_id && !@last_event_id.empty?

  if @config_loader.respond_to?(:version)
    v = @config_loader.version
    return v if v.is_a?(String) && !v.empty?
  end

  if @config_loader.respond_to?(:highwater_mark)
    hw = @config_loader.highwater_mark
    return hw.to_s if hw.is_a?(Numeric) && hw.positive?
    return hw if hw.is_a?(String) && !hw.empty?
  end

  nil
end

#headersObject

Public so tests can assert the headers shape. Body of the request is always empty; this is the full set api-delivery-sse sees.



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/quonfig/sse_config_client.rb', line 125

def headers
  auth = "1:#{@prefab_options.sdk_key}"
  auth_string = Base64.strict_encode64(auth)
  h = {
    'Authorization' => "Basic #{auth_string}",
    'Accept' => 'text/event-stream',
    'Cache-Control' => 'no-cache',
    'X-Quonfig-SDK-Version' => "ruby-#{Quonfig::VERSION}"
  }
  cursor = current_cursor
  h['Last-Event-Id'] = cursor if cursor
  h
end

#on_envelope_error_totalObject

qfg-m3lk: count of user-supplied on_envelope callback invocations that raised. Surfaced for operator visibility — a non-zero value here with restart_total stable means a caller-side listener bug, not a transport problem. (Pre-fix, those raises propagated into run_loop’s rescue and masqueraded as transport errors, causing reconnect storms.)



96
97
98
# File 'lib/quonfig/sse_config_client.rb', line 96

def on_envelope_error_total
  @on_envelope_error_mutex.synchronize { @on_envelope_error_total }
end

#restart_totalObject

Layer 1 (SSE) reconnect counter. Bumped exactly once per reconnect attempt — never per error edge, never per envelope. Read by Quonfig::Client#worker_restart_total(layer: ‘1’) and asserted by chaos scenario 09 (>= 5 after 5 proxy flaps in 30s).



87
88
89
# File 'lib/quonfig/sse_config_client.rb', line 87

def restart_total
  @restart_mutex.synchronize { @restart_total }
end

#start(&on_envelope) ⇒ Object



100
101
102
103
104
# File 'lib/quonfig/sse_config_client.rb', line 100

def start(&on_envelope)
  return if @prefab_options.sse_api_urls.nil? || @prefab_options.sse_api_urls.empty?

  @worker = Thread.new { run_loop(&on_envelope) }
end