Class: Quonfig::SSEConfigClient
- Inherits:
-
Object
- Object
- Quonfig::SSEConfigClient
- Defined in:
- lib/quonfig/sse_config_client.rb
Defined Under Namespace
Classes: Options
Constant Summary collapse
- LOG =
Quonfig::InternalLogger.new(self)
Instance Method Summary collapse
- #close ⇒ Object
- #connect(&load_configs) ⇒ Object
- #headers ⇒ Object
-
#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient
constructor
A new instance of SSEConfigClient.
- #source ⇒ Object
- #start(&load_configs) ⇒ Object
Constructor Details
#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient
Returns a new instance of SSEConfigClient.
27 28 29 30 31 32 33 |
# File 'lib/quonfig/sse_config_client.rb', line 27 def initialize(, config_loader, = nil, logger = nil) @prefab_options = @options = || Options.new @config_loader = config_loader @connected = false @logger = logger || LOG end |
Instance Method Details
#close ⇒ Object
35 36 37 38 |
# File 'lib/quonfig/sse_config_client.rb', line 35 def close @retry_thread&.kill @client&.close end |
#connect(&load_configs) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/quonfig/sse_config_client.rb', line 67 def connect(&load_configs) url = "#{source}/api/v2/sse/config" @logger.debug "SSE Streaming Connect to #{url} start_at #{@config_loader.highwater_mark}" SSE::Client.new(url, headers: headers, read_timeout: @options.sse_read_timeout, reconnect_time: @options.sse_default_reconnect_time, last_event_id: (@config_loader.highwater_mark&.positive? ? @config_loader.highwater_mark.to_s : nil), logger: Quonfig::InternalLogger.new(SSE::Client)) do |client| client.on_event do |event| if event.data.nil? || event.data.empty? @logger.error "SSE Streaming Error: Received empty data for url #{url}" client.close return end begin parsed = JSON.parse(event.data) rescue JSON::ParserError => e @logger.error "SSE Streaming Error: Failed to parse JSON for url #{url}: #{e.}" client.close return end envelope = Quonfig::ConfigEnvelope.new( configs: parsed['configs'] || [], meta: parsed['meta'] || {} ) load_configs.call(envelope, event, :sse) end client.on_error do |error| # SSL "unexpected eof" is expected when SSE sessions timeout normally if error.is_a?(OpenSSL::SSL::SSLError) && error..include?('unexpected eof') @logger.debug "SSE Streaming: Connection closed (expected timeout) for url #{url}" else @logger.error "SSE Streaming Error: #{error.inspect} for url #{url}" end if @options.errors_to_close_connection.any? { |klass| error.is_a?(klass) } @logger.debug "Closing SSE connection for url #{url}" client.close end end end end |
#headers ⇒ Object
115 116 117 118 119 120 121 122 123 |
# File 'lib/quonfig/sse_config_client.rb', line 115 def headers auth = "1:#{@prefab_options.sdk_key}" auth_string = Base64.strict_encode64(auth) return { 'Authorization' => "Basic #{auth_string}", 'Accept' => 'text/event-stream', 'X-Quonfig-SDK-Version' => "sdk-ruby-#{Quonfig::VERSION}" } end |
#source ⇒ Object
125 126 127 128 129 130 131 132 133 |
# File 'lib/quonfig/sse_config_client.rb', line 125 def source @source_index = @source_index.nil? ? 0 : @source_index + 1 if @source_index >= @prefab_options.sse_api_urls.size @source_index = 0 end @prefab_options.sse_api_urls[@source_index] end |
#start(&load_configs) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/quonfig/sse_config_client.rb', line 40 def start(&load_configs) if @prefab_options.sse_api_urls.empty? @logger.debug 'No SSE api_urls configured' return end @client = connect(&load_configs) closed_count = 0 @retry_thread = Thread.new do loop do sleep @options.sleep_delay_for_new_connection_check if @client.closed? closed_count += @options.sleep_delay_for_new_connection_check if closed_count > @options.seconds_between_new_connection closed_count = 0 @logger.debug 'Reconnecting SSE client' @client = connect(&load_configs) end end end end end |