Class: LogStash::Inputs::Elasticsearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Elasticsearch
- Extended by:
- PositiveWholeNumberValidator, URIOrEmptyValidator, PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
- Includes:
- PluginMixins::CATrustedFingerprintSupport, PluginMixins::ECSCompatibilitySupport::TargetCheck, PluginMixins::EventSupport::EventFactoryAdapter, PluginMixins::NormalizeConfigSupport, PluginMixins::Scheduler
- Defined in:
- lib/logstash/inputs/elasticsearch.rb,
lib/logstash/inputs/elasticsearch/esql.rb,
lib/logstash/inputs/elasticsearch/aggregation.rb,
lib/logstash/inputs/elasticsearch/cursor_tracker.rb,
lib/logstash/inputs/elasticsearch/paginated_search.rb
Overview
.Compatibility Note
- NOTE
-
Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.
Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).
Example:
- source,ruby
-
input {
# Read all documents from Elasticsearch matching the given query elasticsearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }' }}
This would create an Elasticsearch query with the following format:
- source,json
-
curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{
"query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ]}‘
Scheduling
Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).
Examples:
|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================
Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
Defined Under Namespace
Modules: PositiveWholeNumberValidator, URIOrEmptyValidator Classes: Aggregation, ColumnSpec, CursorTracker, Esql, PaginatedSearch, Scroll, SearchAfter
Constant Summary collapse
- BUILD_FLAVOR_SERVERLESS =
'serverless'.freeze
- DEFAULT_EAV_HEADER =
{ "Elastic-Api-Version" => "2023-10-31" }.freeze
- INTERNAL_ORIGIN_HEADER =
{ 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze
- LS_ESQL_SUPPORT_VERSION =
the version started using elasticsearch-ruby v8
"8.17.4"- ES_ESQL_SUPPORT_VERSION =
"8.11.0"
Instance Attribute Summary collapse
-
#pipeline_id ⇒ Object
readonly
Returns the value of attribute pipeline_id.
Instance Method Summary collapse
- #decorate_event(event) ⇒ Object
-
#initialize(params = {}) ⇒ Elasticsearch
constructor
A new instance of Elasticsearch.
- #push_hit(hit, output_queue, root_field = '_source') ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
Methods included from URIOrEmptyValidator
Methods included from PositiveWholeNumberValidator
Constructor Details
#initialize(params = {}) ⇒ Elasticsearch
Returns a new instance of Elasticsearch.
299 300 301 302 303 304 305 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 299 def initialize(params={}) super(params) if docinfo_target.nil? @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]'] end end |
Instance Attribute Details
#pipeline_id ⇒ Object (readonly)
Returns the value of attribute pipeline_id.
290 291 292 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 290 def pipeline_id @pipeline_id end |
Instance Method Details
#decorate_event(event) ⇒ Object
395 396 397 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 395 def decorate_event(event) decorate(event) end |
#push_hit(hit, output_queue, root_field = '_source') ⇒ Object
388 389 390 391 392 393 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 388 def push_hit(hit, output_queue, root_field = '_source') event = event_from_hit(hit, root_field) decorate(event) output_queue << event record_last_value(event) end |
#register ⇒ Object
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 307 def register require "rufus/scheduler" @pipeline_id = execution_context&.pipeline_id || 'main' fill_hosts_from_cloud_id setup_ssl_params! if @query_type == 'esql' validate_ls_version_for_esql_support! validate_esql_query! = original_params.keys & %w(index size slices search_api docinfo docinfo_target docinfo_fields response_type tracking_field) raise(LogStash::ConfigurationError, "Configured #{} params are not allowed while using ES|QL query") if &.size > 1 else @base_query = LogStash::Json.load(@query) if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end end @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`") validate_authentication fill_user_password_from_cloud_auth = {:headers => {}} [:headers].merge!(INTERNAL_ORIGIN_HEADER) [:headers].merge!(setup_basic_auth(user, password)) [:headers].merge!(setup_api_key(api_key)) [:headers].merge!({'user-agent' => prepare_user_agent()}) [:headers].merge!(@custom_headers) unless @custom_headers.empty? [:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil? [:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil? [:socket_timeout] = @socket_timeout_seconds unless @socket_timeout_seconds.nil? hosts = setup_hosts = setup_client_ssl @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('') [:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('') @client_options = { :hosts => hosts, :transport_options => , :transport_class => get_transport_client_class, :ssl => } @client = Elasticsearch::Client.new(@client_options) test_connection! validate_es_for_esql_support! setup_serverless setup_search_api @query_executor = create_query_executor setup_cursor_tracker @client end |
#run(output_queue) ⇒ Object
374 375 376 377 378 379 380 381 382 383 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 374 def run(output_queue) if @schedule scheduler.cron(@schedule, :overlap => @schedule_overlap) do @query_executor.do_run(output_queue, get_query_object()) end scheduler.join else @query_executor.do_run(output_queue, get_query_object()) end end |