Class: Quonfig::SSEConfigClient
- Inherits:
-
Object
- Object
- Quonfig::SSEConfigClient
- Defined in:
- lib/quonfig/sse_config_client.rb
Defined Under Namespace
Classes: Options
Constant Summary collapse
- LOG =
Quonfig::InternalLogger.new(self)
Instance Method Summary collapse
- #close ⇒ Object
- #connect(&load_configs) ⇒ Object
-
#current_cursor ⇒ Object
Compute a Last-Event-ID to resume the stream from.
- #headers ⇒ Object
-
#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient
constructor
A new instance of SSEConfigClient.
- #source ⇒ Object
- #start(&load_configs) ⇒ Object
Constructor Details
#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient
Returns a new instance of SSEConfigClient.
28 29 30 31 32 33 34 |
# File 'lib/quonfig/sse_config_client.rb', line 28 def initialize(, config_loader, = nil, logger = nil) @prefab_options = @options = || Options.new @config_loader = config_loader @connected = false @logger = logger || LOG end |
Instance Method Details
#close ⇒ Object
36 37 38 39 |
# File 'lib/quonfig/sse_config_client.rb', line 36 def close @retry_thread&.kill @client&.close end |
#connect(&load_configs) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/quonfig/sse_config_client.rb', line 68 def connect(&load_configs) url = "#{source}/api/v2/sse/config" cursor = current_cursor @logger.debug "SSE Streaming Connect to #{url} start_at #{cursor.inspect}" SSE::Client.new(url, headers: headers, read_timeout: @options.sse_read_timeout, reconnect_time: @options.sse_default_reconnect_time, last_event_id: cursor, logger: Quonfig::InternalLogger.new(SSE::Client)) do |client| client.on_event do |event| if event.data.nil? || event.data.empty? @logger.error "SSE Streaming Error: Received empty data for url #{url}" client.close next end begin parsed = JSON.parse(event.data) rescue JSON::ParserError => e @logger.error "SSE Streaming Error: Failed to parse JSON for url #{url}: #{e.}" client.close next end envelope = Quonfig::ConfigEnvelope.new( configs: parsed['configs'] || [], meta: parsed['meta'] || {} ) load_configs.call(envelope, event, :sse) end client.on_error do |error| # SSL "unexpected eof" is expected when SSE sessions timeout normally if error.is_a?(OpenSSL::SSL::SSLError) && error..include?('unexpected eof') @logger.debug "SSE Streaming: Connection closed (expected timeout) for url #{url}" else @logger.error "SSE Streaming Error: #{error.inspect} for url #{url}" end if @options.errors_to_close_connection.any? { |klass| error.is_a?(klass) } @logger.debug "Closing SSE connection for url #{url}" client.close end end end end |
#current_cursor ⇒ Object
Compute a Last-Event-ID to resume the stream from. Three sources, in priority order:
1. config_loader.version -- string ETag from last HTTP fetch (new path)
2. config_loader.highwater_mark -- legacy numeric cursor
3. nil -- no prior state; stream from HEAD
140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/quonfig/sse_config_client.rb', line 140 def current_cursor if @config_loader.respond_to?(:version) v = @config_loader.version return v if v.is_a?(String) && !v.empty? end if @config_loader.respond_to?(:highwater_mark) hw = @config_loader.highwater_mark return hw.to_s if hw.is_a?(Numeric) && hw.positive? return hw if hw.is_a?(String) && !hw.empty? end nil end |
#headers ⇒ Object
117 118 119 120 121 122 123 124 125 |
# File 'lib/quonfig/sse_config_client.rb', line 117 def headers auth = "1:#{@prefab_options.sdk_key}" auth_string = Base64.strict_encode64(auth) { 'Authorization' => "Basic #{auth_string}", 'Accept' => 'text/event-stream', 'X-Quonfig-SDK-Version' => "ruby-#{Quonfig::VERSION}" } end |
#source ⇒ Object
127 128 129 130 131 132 133 |
# File 'lib/quonfig/sse_config_client.rb', line 127 def source @source_index = @source_index.nil? ? 0 : @source_index + 1 @source_index = 0 if @source_index >= @prefab_options.sse_api_urls.size @prefab_options.sse_api_urls[@source_index] end |
#start(&load_configs) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/quonfig/sse_config_client.rb', line 41 def start(&load_configs) if @prefab_options.sse_api_urls.empty? @logger.debug 'No SSE api_urls configured' return end @client = connect(&load_configs) closed_count = 0 @retry_thread = Thread.new do loop do sleep @options.sleep_delay_for_new_connection_check next unless @client.closed? closed_count += @options.sleep_delay_for_new_connection_check next unless closed_count > @options.seconds_between_new_connection closed_count = 0 @logger.debug 'Reconnecting SSE client' @client = connect(&load_configs) end end end |