Module: Elastic::Transport::Transport::Base Abstract

Includes:
Loggable
Included in:
HTTP::Curb, HTTP::Faraday, HTTP::Manticore
Defined in:
lib/elastic/transport/transport/base.rb

Overview

This module is abstract.

Module with common functionality for transport implementations.

Constant Summary collapse

DEFAULT_PORT =
9200
DEFAULT_PROTOCOL =
'http'.freeze
DEFAULT_RELOAD_AFTER =

Requests

10_000
DEFAULT_RESURRECT_AFTER =

Seconds

60
DEFAULT_MAX_RETRIES =

Requests

3
DEFAULT_SERIALIZER_CLASS =
Serializer::MultiJson
SANITIZED_PASSWORD =
'*' * (rand(14) + 1)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



34
35
36
# File 'lib/elastic/transport/transport/base.rb', line 34

def connections
  @connections
end

#counterObject (readonly)

Returns the value of attribute counter.



34
35
36
# File 'lib/elastic/transport/transport/base.rb', line 34

def counter
  @counter
end

#hostsObject (readonly)

Returns the value of attribute hosts.



34
35
36
# File 'lib/elastic/transport/transport/base.rb', line 34

def hosts
  @hosts
end

#last_request_atObject (readonly)

Returns the value of attribute last_request_at.



34
35
36
# File 'lib/elastic/transport/transport/base.rb', line 34

def last_request_at
  @last_request_at
end

#loggerObject

Returns the value of attribute logger.



35
36
37
# File 'lib/elastic/transport/transport/base.rb', line 35

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



34
35
36
# File 'lib/elastic/transport/transport/base.rb', line 34

def options
  @options
end

#protocolObject (readonly)

Returns the value of attribute protocol.



34
35
36
# File 'lib/elastic/transport/transport/base.rb', line 34

def protocol
  @protocol
end

#reload_afterObject

Returns the value of attribute reload_after.



35
36
37
# File 'lib/elastic/transport/transport/base.rb', line 35

def reload_after
  @reload_after
end

#reload_connectionsObject

Returns the value of attribute reload_connections.



35
36
37
# File 'lib/elastic/transport/transport/base.rb', line 35

def reload_connections
  @reload_connections
end

#resurrect_afterObject

Returns the value of attribute resurrect_after.



35
36
37
# File 'lib/elastic/transport/transport/base.rb', line 35

def resurrect_after
  @resurrect_after
end

#serializerObject

Returns the value of attribute serializer.



35
36
37
# File 'lib/elastic/transport/transport/base.rb', line 35

def serializer
  @serializer
end

#snifferObject

Returns the value of attribute sniffer.



35
36
37
# File 'lib/elastic/transport/transport/base.rb', line 35

def sniffer
  @sniffer
end

#tracerObject

Returns the value of attribute tracer.



35
36
37
# File 'lib/elastic/transport/transport/base.rb', line 35

def tracer
  @tracer
end

Instance Method Details

#__build_connection(host, options = {}, block = nil) ⇒ Connections::Connection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method is abstract.

Build and return a connection. A transport implementation must implement this method. See HTTP::Faraday#__build_connection for an example.

Returns:

Raises:

  • (NoMethodError)


180
181
182
# File 'lib/elastic/transport/transport/base.rb', line 180

def __build_connection(host, options = {}, block = nil)
  raise NoMethodError, 'Implement this method in your class'
end

#__build_connectionsConnections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Builds and returns a collection of connections

The adapters have to implement the #__build_connection method.



147
148
149
150
151
152
153
# File 'lib/elastic/transport/transport/base.rb', line 147

def __build_connections
  Connections::Collection.new(
    connections: __connections_from_host,
    selector_class: options[:selector_class],
    selector: options[:selector]
  )
end

#__close_connectionsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Closes the connections collection



188
189
190
# File 'lib/elastic/transport/transport/base.rb', line 188

def __close_connections
  # A hook point for specific adapters when they need to close connections
end

#__connections_from_hostArray<Connection>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Helper function: Maps over hosts, sets protocol, port and user/password and builds connections

Returns:

  • (Array<Connection>)

    Array<Connection>



160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/elastic/transport/transport/base.rb', line 160

def __connections_from_host
  hosts.map do |host|
    host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
    host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT
    if (options[:user] || options[:http][:user]) && !host[:user]
      host[:user] ||= options[:user] || options[:http][:user]
      host[:password] ||= options[:password] || options[:http][:password]
    end

    __build_connection(host, (options[:transport_options] || {}), @block)
  end
end

#__convert_to_json(o = nil, options = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Converts any non-String object to JSON



235
236
237
# File 'lib/elastic/transport/transport/base.rb', line 235

def __convert_to_json(o=nil, options={})
  o.is_a?(String) ? o : serializer.dump(o, options)
end

#__full_url(host) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a full URL based on information from host

Parameters:

  • host (Hash)

    Host configuration passed in from Client



244
245
246
247
248
249
250
251
# File 'lib/elastic/transport/transport/base.rb', line 244

def __full_url(host)
  url  = "#{host[:protocol]}://"
  url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user]
  url += host[:host]
  url += ":#{host[:port]}" if host[:port]
  url += host[:path] if host[:path]
  url
end

#__log_response(method, path, params, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Log request and response information



196
197
198
199
200
201
202
203
204
# File 'lib/elastic/transport/transport/base.rb', line 196

def __log_response(method, path, params, body, url, response, json, took, duration)
  if logger
    sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
    log_info "#{method.to_s.upcase} #{sanitized_url} " +
                 "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
    log_debug "> #{__convert_to_json(body)}" if body
    log_debug "< #{response.body}"
  end
end

#__raise_transport_error(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raise error specific for the HTTP response status or a generic server error



226
227
228
229
# File 'lib/elastic/transport/transport/base.rb', line 226

def __raise_transport_error(response)
  error = ERRORS[response.status] || ServerError
  raise error.new "[#{response.status}] #{response.body}"
end

#__rebuild_connections(arguments = {}) ⇒ Connections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Rebuilds the connections collection in the transport.

The methods adds new connections from the passed hosts to the collection, and removes all connections not contained in the passed hosts.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/elastic/transport/transport/base.rb', line 123

def __rebuild_connections(arguments={})
  @state_mutex.synchronize do
    @hosts       = arguments[:hosts]    || []
    @options     = arguments[:options]  || {}

    __close_connections

    new_connections = __build_connections
    stale_connections = @connections.all.reject { |c| new_connections.include?(c) }
    new_connections = new_connections.reject { |c| @connections.all.include?(c) }

    @connections.remove(stale_connections)
    @connections.add(new_connections)
    @connections
  end
end

#__trace(method, path, params, headers, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Trace the request in the ‘curl` format



210
211
212
213
214
215
216
217
218
219
220
# File 'lib/elastic/transport/transport/base.rb', line 210

def __trace(method, path, params, headers, body, url, response, json, took, duration)
  trace_url  = "http://localhost:9200/#{path}?pretty" +
      (params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}")
  trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
  trace_command = "curl -X #{method.to_s.upcase}"
  trace_command += " -H '#{headers.collect { |k, v| "#{k}: #{v}" }.join(", ")}'" if headers && !headers.empty?
  trace_command += " '#{trace_url}'#{trace_body}\n"
  tracer.info trace_command
  tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
  tracer.debug json ? serializer.dump(json, pretty: true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
end

#get_connection(options = {}) ⇒ Connections::Connection

Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.

Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.



86
87
88
89
90
91
92
# File 'lib/elastic/transport/transport/base.rb', line 86

def get_connection(options={})
  resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

  @counter_mtx.synchronize { @counter += 1 }
  reload_connections! if reload_connections && (counter % reload_after).zero?
  connections.get_connection(options)
end

#host_unreachable_exceptionsArray

This method is abstract.

Returns an Array of connection errors specific to the transport implementation. See HTTP::Faraday#host_unreachable_exceptions for an example.

Returns:

  • (Array)


375
376
377
# File 'lib/elastic/transport/transport/base.rb', line 375

def host_unreachable_exceptions
  [Errno::ECONNREFUSED]
end

#initialize(arguments = {}, &block) ⇒ Object

Creates a new transport object

Parameters:

  • arguments (Hash) (defaults to: {})

    Settings and options for the transport

  • block (Proc)

    Lambda or Proc which can be evaluated in the context of the “session” object

Options Hash (arguments):

  • :hosts (Array)

    An Array of normalized hosts information

  • :options (Array)

    A Hash with options (usually passed by Client)

See Also:



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/elastic/transport/transport/base.rb', line 49

def initialize(arguments = {}, &block)
  @state_mutex = Mutex.new

  @hosts       = arguments[:hosts]   || []
  @options     = arguments[:options] || {}
  @options[:http] ||= {}
  @options[:retry_on_status] ||= []
  @options[:delay_on_retry]  ||= 0

  @block       = block
  @compression = !!@options[:compression]
  @connections = __build_connections

  @serializer  = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self))
  @protocol    = options[:protocol] || DEFAULT_PROTOCOL

  @logger      = options[:logger]
  @tracer      = options[:tracer]

  @sniffer     = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
  @counter     = 0
  @counter_mtx = Mutex.new
  @last_request_at = Time.now
  @reload_connections = options[:reload_connections]
  @reload_after    = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
  @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
  @retry_on_status = Array(options[:retry_on_status]).map(&:to_i)
end

#perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block) ⇒ Response

This method is abstract.

The transport implementation has to implement this method either in full, or by invoking this method with a block. See HTTP::Faraday#perform_request for an example.

Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.

Parameters:

  • method (String)

    Request method

  • path (String)

    The API endpoint

  • params (Hash) (defaults to: {})

    Request parameters (will be serialized by Connections::Connection#full_url)

  • body (Hash) (defaults to: nil)

    Request body (will be serialized by the #serializer)

  • headers (Hash) (defaults to: nil)

    Request headers (will be serialized by the #serializer)

  • block (Proc)

    Code block to evaluate, passed from the implementation

Returns:

Raises:

  • (NoMethodError)

    If no block is passed

  • (ServerError)

    If request failed on server

  • (Error)

    If no connection is available



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/elastic/transport/transport/base.rb', line 271

def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block)
  raise NoMethodError, 'Implement this method in your transport class' unless block_given?

  start = Time.now
  tries = 0
  reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])
  delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry])

  max_retries = max_retries(opts) || max_retries(options)

  params = params.clone
  # Transforms ignore status codes to Integer
  ignore = Array(params.delete(:ignore)).compact.map(&:to_i)

  begin
    sleep(delay_on_retry / 1000.0) if tries > 0
    tries += 1
    connection = get_connection or raise Error.new('Cannot get new connection from pool.')

    if connection.connection.respond_to?(:params) &&
       connection.connection.params.respond_to?(:to_hash)
      params = connection.connection.params.merge(params.to_hash)
    end

    url = connection.full_url(path, params)
    response = block.call(connection, url)
    connection.healthy! if connection.failures.positive?

    # Raise an exception so we can catch it for `retry_on_status`
    __raise_transport_error(response) if response.status.to_i >= 300 &&
                                         @retry_on_status.include?(response.status.to_i)
  rescue Elastic::Transport::Transport::ServerError => e
    raise e unless response && @retry_on_status.include?(response.status)

    log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
    if tries <= (max_retries || DEFAULT_MAX_RETRIES)
      retry
    else
      log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
      raise e
    end
  rescue *host_unreachable_exceptions => e
    log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"

    connection.dead!

    if reload_on_failure && tries < connections.all.size
      log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
      reload_connections! and retry
    end

    exception = Elastic::Transport::Transport::Error.new(e.message)

    raise exception unless max_retries

    log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
    if tries <= max_retries
      retry
    else
      log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
      raise exception
    end
  rescue Exception => e
    log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
    raise e
  end #/begin

  duration = Time.now - start

  if response.status.to_i >= 300
    __log_response(method, path, params, body, url, response, nil, 'N/A', duration)
    __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer

    # Log the failure only when `ignore` doesn't match the response status
    log_fatal("[#{response.status}] #{response.body}") unless ignore.include?(response.status.to_i)
    __raise_transport_error response unless ignore.include?(response.status.to_i)
  end

  if response.body &&
    !response.body.empty? &&
    response.headers &&
    response.headers["content-type"] =~ /json/

    # Prevent Float value from automatically becoming BigDecimal when using Oj
    load_options = {}
    load_options[:mode] = :compat if ::MultiJson.adapter.to_s == "MultiJson::Adapters::Oj"

    json = serializer.load(response.body, load_options)
  end
  took = (json['took'] ? sprintf('%.3fs', json['took'] / 1000.0) : 'n/a') rescue 'n/a'
  __log_response(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i)
  __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer
  log_warn(response.headers['warning']) if response.headers&.[]('warning')

  Response.new response.status, json || response.body, response.headers
ensure
  @last_request_at = Time.now
end

#reload_connections!Object

Reloads and replaces the connection collection based on cluster information

See Also:



98
99
100
101
102
103
104
105
# File 'lib/elastic/transport/transport/base.rb', line 98

def reload_connections!
  hosts = sniffer.hosts
  __rebuild_connections(hosts: hosts, options: options)
  self
rescue SnifferTimeoutError
  log_error('[SnifferTimeoutError] Timeout when reloading connections.')
  self
end

#resurrect_dead_connections!Object

Tries to “resurrect” all eligible dead connections



111
112
113
# File 'lib/elastic/transport/transport/base.rb', line 111

def resurrect_dead_connections!
  connections.dead.each(&:resurrect!)
end