Class: LogStash::Inputs::Elasticsearch::Scroll
Constant Summary
collapse
- SCROLL_JOB =
"scroll paginated search"
Instance Method Summary
collapse
#do_run, #initialize, #retryable
Instance Method Details
#clear(scroll_id) ⇒ Object
114
115
116
117
118
119
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 114
def clear(scroll_id)
@client.clear_scroll(:body => { :scroll_id => scroll_id }) if scroll_id
rescue => e
logger.debug("Ignoring clear_scroll exception", message: e.message, exception: e.class)
end
|
#initial_search(slice_id) ⇒ Object
63
64
65
66
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 63
def initial_search(slice_id)
options = search_options(slice_id)
@client.search(options)
end
|
#next_page(scroll_id) ⇒ Object
68
69
70
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 68
def next_page(scroll_id)
@client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
|
#process_page(output_queue) ⇒ Object
72
73
74
75
76
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 72
def process_page(output_queue)
r = yield
r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) }
[r['hits']['hits'].any?, r['_scroll_id']]
end
|
#retryable_search(output_queue, slice_id = nil) ⇒ Object
95
96
97
98
99
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 95
def retryable_search(output_queue, slice_id=nil)
retryable(SCROLL_JOB) do
search(output_queue, slice_id)
end
end
|
#retryable_slice_search(output_queue) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
112
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 101
def retryable_slice_search(output_queue)
logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8
@slices.times.map do |slice_id|
Thread.new do
LogStash::Util::set_thread_name("[#{@pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
retryable_search(output_queue, slice_id)
end
end.map(&:join)
logger.trace("#{@slices} slices completed")
end
|
#search(output_queue, slice_id = nil) ⇒ Object
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 78
def search(output_queue, slice_id=nil)
log_details = {}
log_details = log_details.merge({ slice_id: slice_id, slices: @slices }) unless slice_id.nil?
logger.info("Query start", log_details)
has_hits, scroll_id = process_page(output_queue) { initial_search(slice_id) }
while has_hits && scroll_id && !@plugin.stop?
logger.debug("Query progress", log_details)
has_hits, scroll_id = process_page(output_queue) { next_page(scroll_id) }
end
logger.info("Query completed", log_details)
ensure
clear(scroll_id)
end
|
#search_options(slice_id) ⇒ Object
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 52
def search_options(slice_id)
query = @query
query = @query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil?
{
:index => @index,
:scroll => @scroll,
:size => @size,
:body => LogStash::Json.dump(query)
}
end
|