Class: Quonfig::SSEConfigClient

Inherits:
Object
  • Object
show all
Defined in:
lib/quonfig/sse_config_client.rb

Defined Under Namespace

Classes: Options

Constant Summary collapse

LOG =
Quonfig::InternalLogger.new(self)

Instance Method Summary collapse

Constructor Details

#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient

Returns a new instance of SSEConfigClient.



28
29
30
31
32
33
34
# File 'lib/quonfig/sse_config_client.rb', line 28

def initialize(prefab_options, config_loader, options = nil, logger = nil)
  @prefab_options = prefab_options
  @options = options || Options.new
  @config_loader = config_loader
  @connected = false
  @logger = logger || LOG
end

Instance Method Details

#closeObject



36
37
38
39
# File 'lib/quonfig/sse_config_client.rb', line 36

def close
  @retry_thread&.kill
  @client&.close
end

#connect(&load_configs) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/quonfig/sse_config_client.rb', line 68

def connect(&load_configs)
  url = "#{source}/api/v2/sse/config"
  cursor = current_cursor
  @logger.debug "SSE Streaming Connect to #{url} start_at #{cursor.inspect}"

  SSE::Client.new(url,
                  headers: headers,
                  read_timeout: @options.sse_read_timeout,
                  reconnect_time: @options.sse_default_reconnect_time,
                  last_event_id: cursor,
                  logger: Quonfig::InternalLogger.new(SSE::Client)) do |client|
    client.on_event do |event|
      if event.data.nil? || event.data.empty?
        @logger.error "SSE Streaming Error: Received empty data for url #{url}"
        client.close
        next
      end

      begin
        parsed = JSON.parse(event.data)
      rescue JSON::ParserError => e
        @logger.error "SSE Streaming Error: Failed to parse JSON for url #{url}: #{e.message}"
        client.close
        next
      end

      envelope = Quonfig::ConfigEnvelope.new(
        configs: parsed['configs'] || [],
        meta: parsed['meta'] || {}
      )
      load_configs.call(envelope, event, :sse)
    end

    client.on_error do |error|
      # SSL "unexpected eof" is expected when SSE sessions timeout normally
      if error.is_a?(OpenSSL::SSL::SSLError) && error.message.include?('unexpected eof')
        @logger.debug "SSE Streaming: Connection closed (expected timeout) for url #{url}"
      else
        @logger.error "SSE Streaming Error: #{error.inspect} for url #{url}"
      end

      if @options.errors_to_close_connection.any? { |klass| error.is_a?(klass) }
        @logger.debug "Closing SSE connection for url #{url}"
        client.close
      end
    end
  end
end

#current_cursorObject

Compute a Last-Event-ID to resume the stream from. Three sources, in priority order:

1. config_loader.version  -- string ETag from last HTTP fetch (new path)
2. config_loader.highwater_mark -- legacy numeric cursor
3. nil -- no prior state; stream from HEAD


140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/quonfig/sse_config_client.rb', line 140

def current_cursor
  if @config_loader.respond_to?(:version)
    v = @config_loader.version
    return v if v.is_a?(String) && !v.empty?
  end

  if @config_loader.respond_to?(:highwater_mark)
    hw = @config_loader.highwater_mark
    return hw.to_s if hw.is_a?(Numeric) && hw.positive?
    return hw if hw.is_a?(String) && !hw.empty?
  end

  nil
end

#headersObject



117
118
119
120
121
122
123
124
125
# File 'lib/quonfig/sse_config_client.rb', line 117

def headers
  auth = "1:#{@prefab_options.sdk_key}"
  auth_string = Base64.strict_encode64(auth)
  {
    'Authorization' => "Basic #{auth_string}",
    'Accept' => 'text/event-stream',
    'X-Quonfig-SDK-Version' => "ruby-#{Quonfig::VERSION}"
  }
end

#sourceObject



127
128
129
130
131
132
133
# File 'lib/quonfig/sse_config_client.rb', line 127

def source
  @source_index = @source_index.nil? ? 0 : @source_index + 1

  @source_index = 0 if @source_index >= @prefab_options.sse_api_urls.size

  @prefab_options.sse_api_urls[@source_index]
end

#start(&load_configs) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/quonfig/sse_config_client.rb', line 41

def start(&load_configs)
  if @prefab_options.sse_api_urls.empty?
    @logger.debug 'No SSE api_urls configured'
    return
  end

  @client = connect(&load_configs)

  closed_count = 0

  @retry_thread = Thread.new do
    loop do
      sleep @options.sleep_delay_for_new_connection_check

      next unless @client.closed?

      closed_count += @options.sleep_delay_for_new_connection_check

      next unless closed_count > @options.seconds_between_new_connection

      closed_count = 0
      @logger.debug 'Reconnecting SSE client'
      @client = connect(&load_configs)
    end
  end
end