Class: Pinot::JsonHttpTransport
- Inherits:
-
Object
- Object
- Pinot::JsonHttpTransport
- Defined in:
- lib/pinot/transport.rb
Constant Summary collapse
- DEFAULT_HEADERS =
{ "Content-Type" => "application/json; charset=utf-8" }.freeze
- RETRYABLE_ERRORS =
[ Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::ETIMEDOUT, Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout ].freeze
- HTTP_ERROR_MAP =
HTTP status codes that map to specific error classes and are safe to retry
{ "429" => RateLimitError, "503" => BrokerUnavailableError, "504" => BrokerUnavailableError }.freeze
- RETRYABLE_HTTP_ERRORS =
[RateLimitError, BrokerUnavailableError].freeze
Instance Method Summary collapse
- #execute(broker_address, request) ⇒ Object
-
#initialize(http_client:, extra_headers: {}, timeout_ms: nil, logger: nil, max_retries: 0, retry_interval_ms: 200) ⇒ JsonHttpTransport
constructor
A new instance of JsonHttpTransport.
Constructor Details
#initialize(http_client:, extra_headers: {}, timeout_ms: nil, logger: nil, max_retries: 0, retry_interval_ms: 200) ⇒ JsonHttpTransport
Returns a new instance of JsonHttpTransport.
181 182 183 184 185 186 187 188 189 |
# File 'lib/pinot/transport.rb', line 181 def initialize(http_client:, extra_headers: {}, timeout_ms: nil, logger: nil, max_retries: 0, retry_interval_ms: 200) @http_client = http_client @extra_headers = extra_headers @timeout_ms = timeout_ms @logger = logger @max_retries = max_retries @retry_interval_ms = retry_interval_ms end |
Instance Method Details
#execute(broker_address, request) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/pinot/transport.rb', line 191 def execute(broker_address, request) logger.debug "Pinot query to #{broker_address}: #{request.query}" attempts = 0 max_attempts = (@max_retries || 0) + 1 begin attempts += 1 url = build_url(broker_address, request.query_format) body = build_body(request) headers = DEFAULT_HEADERS .merge(@extra_headers) .merge("X-Correlation-Id" => SecureRandom.uuid) resp = @http_client.post(url, body: body, headers: headers) if (error_class = HTTP_ERROR_MAP[resp.code]) logger.error "Pinot broker returned HTTP #{resp.code}" raise error_class, "http exception with HTTP status code #{resp.code}" end unless resp.code.to_i == 200 logger.error "Pinot broker returned HTTP #{resp.code}" raise TransportError, "http exception with HTTP status code #{resp.code}" end begin BrokerResponse.from_json(resp.body) rescue JSON::ParserError => e raise e. end rescue *RETRYABLE_HTTP_ERRORS, *RETRYABLE_ERRORS => e if attempts < max_attempts sleep_ms = (@retry_interval_ms || 200) * (2 ** (attempts - 1)) sleep(sleep_ms / 1000.0) retry end raise end end |