Class: LogStash::Inputs::Elasticsearch::Esql

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/inputs/elasticsearch/esql.rb

Constant Summary collapse

ESQL_JOB =
"ES|QL job"
ESQL_PARSERS_BY_TYPE =
Hash.new(lambda { |x| x }).merge(
'date' => ->(value) { value && LogStash::Timestamp.new(value) },
)

Instance Method Summary collapse

Constructor Details

#initialize(client, plugin) ⇒ Esql

Initialize the ESQL query executor

Parameters:



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/logstash/inputs/elasticsearch/esql.rb', line 18

def initialize(client, plugin)
  @client = client
  @event_decorator = plugin.method(:decorate_event)
  @retries = plugin.params["retries"]

  target_field = plugin.params["target"]
  if target_field
    def self.apply_target(path); "[#{target_field}][#{path}]"; end
  else
    def self.apply_target(path); path; end
  end

  @query = plugin.params["query"]
  unless @query.include?('METADATA')
    logger.info("`METADATA` not found the query. `_id`, `_version` and `_index` will not be available in the result", {:query => @query})
  end
  logger.debug("ES|QL executor initialized with", {:query => @query})
end

Instance Method Details

#do_run(output_queue, query) ⇒ Object

Execute the ESQL query and process results

Parameters:

  • output_queue (Queue)

    The queue to push processed events to

  • query

    A query (to obey interface definition)



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/logstash/inputs/elasticsearch/esql.rb', line 40

def do_run(output_queue, query)
  logger.info("ES|QL executor has started")
  response = retryable(ESQL_JOB) do
    @client.esql.query({ body: { query: @query }, format: 'json', drop_null_columns: true })
  end
  # retriable already printed error details
  return if response == false

  if response&.headers&.dig("warning")
    logger.warn("ES|QL executor received warning", {:warning_message => response.headers["warning"]})
  end
  columns = response['columns']&.freeze
  values = response['values']&.freeze
  logger.debug("ES|QL query response size: #{values&.size}")

  process_response(columns, values, output_queue) if columns && values
end

#retryable(job_name) { ... } ⇒ Boolean

Execute a retryable operation with proper error handling

Parameters:

  • job_name (String)

    Name of the job for logging purposes

Yields:

  • The block to execute

Returns:

  • (Boolean)

    true if successful, false otherwise



62
63
64
65
66
67
68
69
70
# File 'lib/logstash/inputs/elasticsearch/esql.rb', line 62

def retryable(job_name, &block)
  stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
  stud_try.try((@retries + 1).times) { yield }
rescue => e
  error_details = {:message => e.message, :cause => e.cause}
  error_details[:backtrace] = e.backtrace if logger.debug?
  logger.error("#{job_name} failed with ", error_details)
  false
end