Class: LogStash::Inputs::Elasticsearch::Esql
- Inherits:
-
Object
- Object
- LogStash::Inputs::Elasticsearch::Esql
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/inputs/elasticsearch/esql.rb
Constant Summary collapse
- ESQL_JOB =
"ES|QL job"- ESQL_PARSERS_BY_TYPE =
Hash.new(lambda { |x| x }).merge( 'date' => ->(value) { value && LogStash::Timestamp.new(value) }, )
Instance Method Summary collapse
-
#do_run(output_queue, query) ⇒ Object
Execute the ESQL query and process results.
-
#initialize(client, plugin) ⇒ Esql
constructor
Initialize the ESQL query executor.
-
#retryable(job_name) { ... } ⇒ Boolean
Execute a retryable operation with proper error handling.
Constructor Details
#initialize(client, plugin) ⇒ Esql
Initialize the ESQL query executor
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/logstash/inputs/elasticsearch/esql.rb', line 18 def initialize(client, plugin) @client = client @event_decorator = plugin.method(:decorate_event) @retries = plugin.params["retries"] target_field = plugin.params["target"] if target_field def self.apply_target(path); "[#{target_field}][#{path}]"; end else def self.apply_target(path); path; end end @query = plugin.params["query"] unless @query.include?('METADATA') logger.info("`METADATA` not found the query. `_id`, `_version` and `_index` will not be available in the result", {:query => @query}) end logger.debug("ES|QL executor initialized with", {:query => @query}) end |
Instance Method Details
#do_run(output_queue, query) ⇒ Object
Execute the ESQL query and process results
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/logstash/inputs/elasticsearch/esql.rb', line 40 def do_run(output_queue, query) logger.info("ES|QL executor has started") response = retryable(ESQL_JOB) do @client.esql.query({ body: { query: @query }, format: 'json', drop_null_columns: true }) end # retriable already printed error details return if response == false if response&.headers&.dig("warning") logger.warn("ES|QL executor received warning", {:warning_message => response.headers["warning"]}) end columns = response['columns']&.freeze values = response['values']&.freeze logger.debug("ES|QL query response size: #{values&.size}") process_response(columns, values, output_queue) if columns && values end |
#retryable(job_name) { ... } ⇒ Boolean
Execute a retryable operation with proper error handling
62 63 64 65 66 67 68 69 70 |
# File 'lib/logstash/inputs/elasticsearch/esql.rb', line 62 def retryable(job_name, &block) stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name) stud_try.try((@retries + 1).times) { yield } rescue => e error_details = {:message => e., :cause => e.cause} error_details[:backtrace] = e.backtrace if logger.debug? logger.error("#{job_name} failed with ", error_details) false end |