Module: Elastic::Transport::Transport::Base Abstract
- Includes:
- Loggable
- Included in:
- HTTP::Curb, HTTP::Faraday, HTTP::Manticore
- Defined in:
- lib/elastic/transport/transport/base.rb
Overview
Module with common functionality for transport implementations.
Constant Summary collapse
- DEFAULT_PORT =
9200
- DEFAULT_PROTOCOL =
'http'.freeze
- DEFAULT_RELOAD_AFTER =
Requests
10_000
- DEFAULT_RESURRECT_AFTER =
Seconds
60
- DEFAULT_MAX_RETRIES =
Requests
3
- DEFAULT_SERIALIZER_CLASS =
Serializer::MultiJson
- SANITIZED_PASSWORD =
'*' * (rand(14) + 1)
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
-
#counter ⇒ Object
readonly
Returns the value of attribute counter.
-
#hosts ⇒ Object
readonly
Returns the value of attribute hosts.
-
#last_request_at ⇒ Object
readonly
Returns the value of attribute last_request_at.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#protocol ⇒ Object
readonly
Returns the value of attribute protocol.
-
#reload_after ⇒ Object
Returns the value of attribute reload_after.
-
#reload_connections ⇒ Object
Returns the value of attribute reload_connections.
-
#resurrect_after ⇒ Object
Returns the value of attribute resurrect_after.
-
#serializer ⇒ Object
Returns the value of attribute serializer.
-
#sniffer ⇒ Object
Returns the value of attribute sniffer.
-
#tracer ⇒ Object
Returns the value of attribute tracer.
Instance Method Summary collapse
- #__build_connection(host, options = {}, block = nil) ⇒ Connections::Connection abstract private
-
#__build_connections ⇒ Connections::Collection
private
Builds and returns a collection of connections.
-
#__close_connections ⇒ Object
private
Closes the connections collection.
-
#__connections_from_host ⇒ Array<Connection>
private
Helper function: Maps over hosts, sets protocol, port and user/password and builds connections.
-
#__convert_to_json(o = nil, options = {}) ⇒ Object
private
Converts any non-String object to JSON.
-
#__full_url(host) ⇒ Object
private
Returns a full URL based on information from host.
-
#__log_response(method, path, params, body, url, response, json, took, duration) ⇒ Object
private
Log request and response information.
-
#__raise_transport_error(response) ⇒ Object
private
Raise error specific for the HTTP response status or a generic server error.
-
#__rebuild_connections(arguments = {}) ⇒ Connections::Collection
private
Rebuilds the connections collection in the transport.
-
#__trace(method, path, params, headers, body, url, response, json, took, duration) ⇒ Object
private
Trace the request in the ‘curl` format.
-
#get_connection(options = {}) ⇒ Connections::Connection
Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.
- #host_unreachable_exceptions ⇒ Array abstract
-
#initialize(arguments = {}, &block) ⇒ Object
Creates a new transport object.
-
#perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block) ⇒ Response
abstract
Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.
-
#reload_connections! ⇒ Object
Reloads and replaces the connection collection based on cluster information.
-
#resurrect_dead_connections! ⇒ Object
Tries to “resurrect” all eligible dead connections.
Methods included from Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
34 35 36 |
# File 'lib/elastic/transport/transport/base.rb', line 34 def connections @connections end |
#counter ⇒ Object (readonly)
Returns the value of attribute counter.
34 35 36 |
# File 'lib/elastic/transport/transport/base.rb', line 34 def counter @counter end |
#hosts ⇒ Object (readonly)
Returns the value of attribute hosts.
34 35 36 |
# File 'lib/elastic/transport/transport/base.rb', line 34 def hosts @hosts end |
#last_request_at ⇒ Object (readonly)
Returns the value of attribute last_request_at.
34 35 36 |
# File 'lib/elastic/transport/transport/base.rb', line 34 def last_request_at @last_request_at end |
#logger ⇒ Object
Returns the value of attribute logger.
35 36 37 |
# File 'lib/elastic/transport/transport/base.rb', line 35 def logger @logger end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
34 35 36 |
# File 'lib/elastic/transport/transport/base.rb', line 34 def @options end |
#protocol ⇒ Object (readonly)
Returns the value of attribute protocol.
34 35 36 |
# File 'lib/elastic/transport/transport/base.rb', line 34 def protocol @protocol end |
#reload_after ⇒ Object
Returns the value of attribute reload_after.
35 36 37 |
# File 'lib/elastic/transport/transport/base.rb', line 35 def reload_after @reload_after end |
#reload_connections ⇒ Object
Returns the value of attribute reload_connections.
35 36 37 |
# File 'lib/elastic/transport/transport/base.rb', line 35 def reload_connections @reload_connections end |
#resurrect_after ⇒ Object
Returns the value of attribute resurrect_after.
35 36 37 |
# File 'lib/elastic/transport/transport/base.rb', line 35 def resurrect_after @resurrect_after end |
#serializer ⇒ Object
Returns the value of attribute serializer.
35 36 37 |
# File 'lib/elastic/transport/transport/base.rb', line 35 def serializer @serializer end |
#sniffer ⇒ Object
Returns the value of attribute sniffer.
35 36 37 |
# File 'lib/elastic/transport/transport/base.rb', line 35 def sniffer @sniffer end |
#tracer ⇒ Object
Returns the value of attribute tracer.
35 36 37 |
# File 'lib/elastic/transport/transport/base.rb', line 35 def tracer @tracer end |
Instance Method Details
#__build_connection(host, options = {}, block = nil) ⇒ Connections::Connection
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Build and return a connection. A transport implementation must implement this method. See HTTP::Faraday#__build_connection for an example.
180 181 182 |
# File 'lib/elastic/transport/transport/base.rb', line 180 def __build_connection(host, = {}, block = nil) raise NoMethodError, 'Implement this method in your class' end |
#__build_connections ⇒ Connections::Collection
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Builds and returns a collection of connections
The adapters have to implement the #__build_connection method.
147 148 149 150 151 152 153 |
# File 'lib/elastic/transport/transport/base.rb', line 147 def __build_connections Connections::Collection.new( connections: __connections_from_host, selector_class: [:selector_class], selector: [:selector] ) end |
#__close_connections ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Closes the connections collection
188 189 190 |
# File 'lib/elastic/transport/transport/base.rb', line 188 def __close_connections # A hook point for specific adapters when they need to close connections end |
#__connections_from_host ⇒ Array<Connection>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Helper function: Maps over hosts, sets protocol, port and user/password and builds connections
160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/elastic/transport/transport/base.rb', line 160 def __connections_from_host hosts.map do |host| host[:protocol] = host[:scheme] || [:scheme] || [:http][:scheme] || DEFAULT_PROTOCOL host[:port] ||= [:port] || [:http][:port] || DEFAULT_PORT if ([:user] || [:http][:user]) && !host[:user] host[:user] ||= [:user] || [:http][:user] host[:password] ||= [:password] || [:http][:password] end __build_connection(host, ([:transport_options] || {}), @block) end end |
#__convert_to_json(o = nil, options = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Converts any non-String object to JSON
235 236 237 |
# File 'lib/elastic/transport/transport/base.rb', line 235 def __convert_to_json(o=nil, ={}) o.is_a?(String) ? o : serializer.dump(o, ) end |
#__full_url(host) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a full URL based on information from host
244 245 246 247 248 249 250 251 |
# File 'lib/elastic/transport/transport/base.rb', line 244 def __full_url(host) url = "#{host[:protocol]}://" url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user] url += host[:host] url += ":#{host[:port]}" if host[:port] url += host[:path] if host[:path] url end |
#__log_response(method, path, params, body, url, response, json, took, duration) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Log request and response information
196 197 198 199 200 201 202 203 204 |
# File 'lib/elastic/transport/transport/base.rb', line 196 def __log_response(method, path, params, body, url, response, json, took, duration) if logger sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') log_info "#{method.to_s.upcase} #{sanitized_url} " + "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" log_debug "> #{__convert_to_json(body)}" if body log_debug "< #{response.body}" end end |
#__raise_transport_error(response) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Raise error specific for the HTTP response status or a generic server error
226 227 228 229 |
# File 'lib/elastic/transport/transport/base.rb', line 226 def __raise_transport_error(response) error = ERRORS[response.status] || ServerError raise error.new "[#{response.status}] #{response.body}" end |
#__rebuild_connections(arguments = {}) ⇒ Connections::Collection
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Rebuilds the connections collection in the transport.
The methods adds new connections from the passed hosts to the collection, and removes all connections not contained in the passed hosts.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/elastic/transport/transport/base.rb', line 123 def __rebuild_connections(arguments={}) @state_mutex.synchronize do @hosts = arguments[:hosts] || [] @options = arguments[:options] || {} __close_connections new_connections = __build_connections stale_connections = @connections.all.reject { |c| new_connections.include?(c) } new_connections = new_connections.reject { |c| @connections.all.include?(c) } @connections.remove(stale_connections) @connections.add(new_connections) @connections end end |
#__trace(method, path, params, headers, body, url, response, json, took, duration) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Trace the request in the ‘curl` format
210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/elastic/transport/transport/base.rb', line 210 def __trace(method, path, params, headers, body, url, response, json, took, duration) trace_url = "http://localhost:9200/#{path}?pretty" + (params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}") trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' trace_command = "curl -X #{method.to_s.upcase}" trace_command += " -H '#{headers.collect { |k, v| "#{k}: #{v}" }.join(", ")}'" if headers && !headers.empty? trace_command += " '#{trace_url}'#{trace_body}\n" tracer.info trace_command tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" tracer.debug json ? serializer.dump(json, pretty: true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" end |
#get_connection(options = {}) ⇒ Connections::Connection
Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.
Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.
86 87 88 89 90 91 92 |
# File 'lib/elastic/transport/transport/base.rb', line 86 def get_connection(={}) resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after @counter_mtx.synchronize { @counter += 1 } reload_connections! if reload_connections && (counter % reload_after).zero? connections.get_connection() end |
#host_unreachable_exceptions ⇒ Array
Returns an Array of connection errors specific to the transport implementation. See HTTP::Faraday#host_unreachable_exceptions for an example.
375 376 377 |
# File 'lib/elastic/transport/transport/base.rb', line 375 def host_unreachable_exceptions [Errno::ECONNREFUSED] end |
#initialize(arguments = {}, &block) ⇒ Object
Creates a new transport object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/elastic/transport/transport/base.rb', line 49 def initialize(arguments = {}, &block) @state_mutex = Mutex.new @hosts = arguments[:hosts] || [] @options = arguments[:options] || {} @options[:http] ||= {} @options[:retry_on_status] ||= [] @options[:delay_on_retry] ||= 0 @block = block @compression = !!@options[:compression] @connections = __build_connections @serializer = [:serializer] || ( [:serializer_class] ? [:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self)) @protocol = [:protocol] || DEFAULT_PROTOCOL @logger = [:logger] @tracer = [:tracer] @sniffer = [:sniffer_class] ? [:sniffer_class].new(self) : Sniffer.new(self) @counter = 0 @counter_mtx = Mutex.new @last_request_at = Time.now @reload_connections = [:reload_connections] @reload_after = [:reload_connections].is_a?(Integer) ? [:reload_connections] : DEFAULT_RELOAD_AFTER @resurrect_after = [:resurrect_after] || DEFAULT_RESURRECT_AFTER @retry_on_status = Array([:retry_on_status]).map(&:to_i) end |
#perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block) ⇒ Response
The transport implementation has to implement this method either in full, or by invoking this method with a block. See HTTP::Faraday#perform_request for an example.
Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
# File 'lib/elastic/transport/transport/base.rb', line 271 def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block) raise NoMethodError, 'Implement this method in your transport class' unless block_given? start = Time.now tries = 0 reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure]) delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry]) max_retries = max_retries(opts) || max_retries() params = params.clone # Transforms ignore status codes to Integer ignore = Array(params.delete(:ignore)).compact.map(&:to_i) begin sleep(delay_on_retry / 1000.0) if tries > 0 tries += 1 connection = get_connection or raise Error.new('Cannot get new connection from pool.') if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash) params = connection.connection.params.merge(params.to_hash) end url = connection.full_url(path, params) response = block.call(connection, url) connection.healthy! if connection.failures.positive? # Raise an exception so we can catch it for `retry_on_status` __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) rescue Elastic::Transport::Transport::ServerError => e raise e unless response && @retry_on_status.include?(response.status) log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if tries <= (max_retries || DEFAULT_MAX_RETRIES) retry else log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" raise e end rescue *host_unreachable_exceptions => e log_error "[#{e.class}] #{e.} #{connection.host.inspect}" connection.dead! if reload_on_failure && tries < connections.all.size log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" reload_connections! and retry end exception = Elastic::Transport::Transport::Error.new(e.) raise exception unless max_retries log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if tries <= max_retries retry else log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" raise exception end rescue Exception => e log_fatal "[#{e.class}] #{e.} (#{connection.host.inspect if connection})" raise e end #/begin duration = Time.now - start if response.status.to_i >= 300 __log_response(method, path, params, body, url, response, nil, 'N/A', duration) __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer # Log the failure only when `ignore` doesn't match the response status log_fatal("[#{response.status}] #{response.body}") unless ignore.include?(response.status.to_i) __raise_transport_error response unless ignore.include?(response.status.to_i) end if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/ # Prevent Float value from automatically becoming BigDecimal when using Oj = {} [:mode] = :compat if ::MultiJson.adapter.to_s == "MultiJson::Adapters::Oj" json = serializer.load(response.body, ) end took = (json['took'] ? sprintf('%.3fs', json['took'] / 1000.0) : 'n/a') rescue 'n/a' __log_response(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i) __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer log_warn(response.headers['warning']) if response.headers&.[]('warning') Response.new response.status, json || response.body, response.headers ensure @last_request_at = Time.now end |
#reload_connections! ⇒ Object
Reloads and replaces the connection collection based on cluster information
98 99 100 101 102 103 104 105 |
# File 'lib/elastic/transport/transport/base.rb', line 98 def reload_connections! hosts = sniffer.hosts __rebuild_connections(hosts: hosts, options: ) self rescue SnifferTimeoutError log_error('[SnifferTimeoutError] Timeout when reloading connections.') self end |
#resurrect_dead_connections! ⇒ Object
Tries to “resurrect” all eligible dead connections
111 112 113 |
# File 'lib/elastic/transport/transport/base.rb', line 111 def resurrect_dead_connections! connections.dead.each(&:resurrect!) end |