Class: LogStash::Filters::ElasticsearchClient

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/filters/elasticsearch/client.rb

Constant Summary collapse

BUILD_FLAVOR_SERVERLESS =
'serverless'.freeze
DEFAULT_EAV_HEADER =
{ "Elastic-Api-Version" => "2023-10-31" }.freeze
INTERNAL_ORIGIN_HEADER =
{ 'x-elastic-product-origin' => 'logstash-filter-elasticsearch'}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, hosts, options = {}) ⇒ ElasticsearchClient

Returns a new instance of ElasticsearchClient.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/logstash/filters/elasticsearch/client.rb', line 16

def initialize(logger, hosts, options = {})
  user = options.fetch(:user, nil)
  password = options.fetch(:password, nil)
  api_key = options.fetch(:api_key, nil)
  proxy = options.fetch(:proxy, nil)
  user_agent = options[:user_agent]
  custom_headers = options[:custom_headers]


  transport_options = { }
  transport_options[:headers] = options.fetch(:serverless, false) ?  DEFAULT_EAV_HEADER.dup : {}
  transport_options[:headers].merge!(setup_basic_auth(user, password))
  transport_options[:headers].merge!(setup_api_key(api_key))
  transport_options[:headers].merge!({ 'user-agent' => "#{user_agent}" })
  transport_options[:headers].merge!(INTERNAL_ORIGIN_HEADER)
  transport_options[:headers].merge!(custom_headers) unless custom_headers.empty?

  transport_options[:pool_max] = 1000
  transport_options[:pool_max_per_route] = 100

  logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')
  transport_options[:proxy] = proxy.to_s if proxy && !proxy.eql?('')

  ssl_options = options.fetch(:ssl, { :enabled => false })
  ssl_enabled = ssl_options.fetch(:enabled, false)

  hosts = setup_hosts(hosts, ssl_enabled)

  client_options = {
                 hosts: hosts,
       transport_class: ::Elastic::Transport::Transport::HTTP::Manticore,
     transport_options: transport_options,
                   ssl: ssl_options,
      retry_on_failure: options[:retry_on_failure],
       retry_on_status: options[:retry_on_status]
  }

  logger.info("New ElasticSearch filter client", :hosts => hosts)
  @client = ::Elasticsearch::Client.new(client_options)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



10
11
12
# File 'lib/logstash/filters/elasticsearch/client.rb', line 10

def client
  @client
end

Instance Method Details

#build_flavorObject



73
74
75
# File 'lib/logstash/filters/elasticsearch/client.rb', line 73

def build_flavor
  @build_flavor ||= info&.dig('version', 'build_flavor')
end

#es_versionObject



69
70
71
# File 'lib/logstash/filters/elasticsearch/client.rb', line 69

def es_version
  info&.dig('version', 'number')
end

#esql_query(params = {}) ⇒ Object



61
62
63
# File 'lib/logstash/filters/elasticsearch/client.rb', line 61

def esql_query(params={})
  @client.esql.query(params)
end

#infoObject



65
66
67
# File 'lib/logstash/filters/elasticsearch/client.rb', line 65

def info
  @client.info
end

#search(params = {}) ⇒ Object



57
58
59
# File 'lib/logstash/filters/elasticsearch/client.rb', line 57

def search(params={})
  @client.search(params)
end

#serverless?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/logstash/filters/elasticsearch/client.rb', line 77

def serverless?
  @is_serverless ||= (build_flavor == BUILD_FLAVOR_SERVERLESS)
end