Class: Quonfig::SSEConfigClient
- Inherits:
-
Object
- Object
- Quonfig::SSEConfigClient
- Defined in:
- lib/quonfig/sse_config_client.rb
Defined Under Namespace
Classes: Options
Constant Summary collapse
- LOG =
Quonfig::InternalLogger.new(self)
Instance Method Summary collapse
- #close ⇒ Object
- #connect(&load_configs) ⇒ Object
-
#current_cursor ⇒ Object
Compute a Last-Event-ID to resume the stream from.
- #headers ⇒ Object
-
#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient
constructor
A new instance of SSEConfigClient.
- #source ⇒ Object
- #start(&load_configs) ⇒ Object
Constructor Details
#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient
Returns a new instance of SSEConfigClient.
27 28 29 30 31 32 33 |
# File 'lib/quonfig/sse_config_client.rb', line 27 def initialize(, config_loader, = nil, logger = nil) @prefab_options = @options = || Options.new @config_loader = config_loader @connected = false @logger = logger || LOG end |
Instance Method Details
#close ⇒ Object
35 36 37 38 |
# File 'lib/quonfig/sse_config_client.rb', line 35 def close @retry_thread&.kill @client&.close end |
#connect(&load_configs) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/quonfig/sse_config_client.rb', line 67 def connect(&load_configs) url = "#{source}/api/v2/sse/config" cursor = current_cursor @logger.debug "SSE Streaming Connect to #{url} start_at #{cursor.inspect}" SSE::Client.new(url, headers: headers, read_timeout: @options.sse_read_timeout, reconnect_time: @options.sse_default_reconnect_time, last_event_id: cursor, logger: Quonfig::InternalLogger.new(SSE::Client)) do |client| client.on_event do |event| if event.data.nil? || event.data.empty? @logger.error "SSE Streaming Error: Received empty data for url #{url}" client.close return end begin parsed = JSON.parse(event.data) rescue JSON::ParserError => e @logger.error "SSE Streaming Error: Failed to parse JSON for url #{url}: #{e.}" client.close return end envelope = Quonfig::ConfigEnvelope.new( configs: parsed['configs'] || [], meta: parsed['meta'] || {} ) load_configs.call(envelope, event, :sse) end client.on_error do |error| # SSL "unexpected eof" is expected when SSE sessions timeout normally if error.is_a?(OpenSSL::SSL::SSLError) && error..include?('unexpected eof') @logger.debug "SSE Streaming: Connection closed (expected timeout) for url #{url}" else @logger.error "SSE Streaming Error: #{error.inspect} for url #{url}" end if @options.errors_to_close_connection.any? { |klass| error.is_a?(klass) } @logger.debug "Closing SSE connection for url #{url}" client.close end end end end |
#current_cursor ⇒ Object
Compute a Last-Event-ID to resume the stream from. Three sources, in priority order:
1. config_loader.version -- string ETag from last HTTP fetch (new path)
2. config_loader.highwater_mark -- legacy numeric cursor
3. nil -- no prior state; stream from HEAD
141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/quonfig/sse_config_client.rb', line 141 def current_cursor if @config_loader.respond_to?(:version) v = @config_loader.version return v if v.is_a?(String) && !v.empty? end if @config_loader.respond_to?(:highwater_mark) hw = @config_loader.highwater_mark return hw.to_s if hw.is_a?(Numeric) && hw.positive? return hw if hw.is_a?(String) && !hw.empty? end nil end |
#headers ⇒ Object
116 117 118 119 120 121 122 123 124 |
# File 'lib/quonfig/sse_config_client.rb', line 116 def headers auth = "1:#{@prefab_options.sdk_key}" auth_string = Base64.strict_encode64(auth) return { 'Authorization' => "Basic #{auth_string}", 'Accept' => 'text/event-stream', 'X-Quonfig-SDK-Version' => "sdk-ruby-#{Quonfig::VERSION}" } end |
#source ⇒ Object
126 127 128 129 130 131 132 133 134 |
# File 'lib/quonfig/sse_config_client.rb', line 126 def source @source_index = @source_index.nil? ? 0 : @source_index + 1 if @source_index >= @prefab_options.sse_api_urls.size @source_index = 0 end @prefab_options.sse_api_urls[@source_index] end |
#start(&load_configs) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/quonfig/sse_config_client.rb', line 40 def start(&load_configs) if @prefab_options.sse_api_urls.empty? @logger.debug 'No SSE api_urls configured' return end @client = connect(&load_configs) closed_count = 0 @retry_thread = Thread.new do loop do sleep @options.sleep_delay_for_new_connection_check if @client.closed? closed_count += @options.sleep_delay_for_new_connection_check if closed_count > @options.seconds_between_new_connection closed_count = 0 @logger.debug 'Reconnecting SSE client' @client = connect(&load_configs) end end end end end |