Class: Quonfig::SSEConfigClient

Inherits:
Object
  • Object
show all
Defined in:
lib/quonfig/sse_config_client.rb

Defined Under Namespace

Classes: Options

Constant Summary collapse

LOG =
Quonfig::InternalLogger.new(self)

Instance Method Summary collapse

Constructor Details

#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient

Returns a new instance of SSEConfigClient.



27
28
29
30
31
32
33
# File 'lib/quonfig/sse_config_client.rb', line 27

def initialize(prefab_options, config_loader, options = nil, logger = nil)
  @prefab_options = prefab_options
  @options = options || Options.new
  @config_loader = config_loader
  @connected = false
  @logger = logger || LOG
end

Instance Method Details

#closeObject



35
36
37
38
# File 'lib/quonfig/sse_config_client.rb', line 35

def close
  @retry_thread&.kill
  @client&.close
end

#connect(&load_configs) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/quonfig/sse_config_client.rb', line 67

def connect(&load_configs)
  url = "#{source}/api/v2/sse/config"
  @logger.debug "SSE Streaming Connect to #{url} start_at #{@config_loader.highwater_mark}"

  SSE::Client.new(url,
                  headers: headers,
                  read_timeout: @options.sse_read_timeout,
                  reconnect_time: @options.sse_default_reconnect_time,
                  last_event_id: (@config_loader.highwater_mark&.positive? ? @config_loader.highwater_mark.to_s : nil),
                  logger: Quonfig::InternalLogger.new(SSE::Client)) do |client|
    client.on_event do |event|
      if event.data.nil? || event.data.empty?
        @logger.error "SSE Streaming Error: Received empty data for url #{url}"
        client.close
        return
      end

      begin
        parsed = JSON.parse(event.data)
      rescue JSON::ParserError => e
        @logger.error "SSE Streaming Error: Failed to parse JSON for url #{url}: #{e.message}"
        client.close
        return
      end

      envelope = Quonfig::ConfigEnvelope.new(
        configs: parsed['configs'] || [],
        meta: parsed['meta'] || {}
      )
      load_configs.call(envelope, event, :sse)
    end

    client.on_error do |error|
      # SSL "unexpected eof" is expected when SSE sessions timeout normally
      if error.is_a?(OpenSSL::SSL::SSLError) && error.message.include?('unexpected eof')
        @logger.debug "SSE Streaming: Connection closed (expected timeout) for url #{url}"
      else
        @logger.error "SSE Streaming Error: #{error.inspect} for url #{url}"
      end

      if @options.errors_to_close_connection.any? { |klass| error.is_a?(klass) }
        @logger.debug "Closing SSE connection for url #{url}"
        client.close
      end
    end
  end
end

#headersObject



115
116
117
118
119
120
121
122
123
# File 'lib/quonfig/sse_config_client.rb', line 115

def headers
  auth = "1:#{@prefab_options.sdk_key}"
  auth_string = Base64.strict_encode64(auth)
  return {
    'Authorization' => "Basic #{auth_string}",
    'Accept' => 'text/event-stream',
    'X-Quonfig-SDK-Version' => "sdk-ruby-#{Quonfig::VERSION}"
  }
end

#sourceObject



125
126
127
128
129
130
131
132
133
# File 'lib/quonfig/sse_config_client.rb', line 125

def source
  @source_index = @source_index.nil? ? 0 : @source_index + 1

  if @source_index >= @prefab_options.sse_api_urls.size
    @source_index = 0
  end

  @prefab_options.sse_api_urls[@source_index]
end

#start(&load_configs) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/quonfig/sse_config_client.rb', line 40

def start(&load_configs)
  if @prefab_options.sse_api_urls.empty?
    @logger.debug 'No SSE api_urls configured'
    return
  end

  @client = connect(&load_configs)

  closed_count = 0

  @retry_thread = Thread.new do
    loop do
      sleep @options.sleep_delay_for_new_connection_check

      if @client.closed?
        closed_count += @options.sleep_delay_for_new_connection_check

        if closed_count > @options.seconds_between_new_connection
          closed_count = 0
          @logger.debug 'Reconnecting SSE client'
          @client = connect(&load_configs)
        end
      end
    end
  end
end