Class: LogStash::Inputs::Elasticsearch::SearchAfter
Constant Summary
collapse
- PIT_JOB =
"create point in time (PIT)"
- SEARCH_AFTER_JOB =
"search_after paginated search"
Instance Attribute Summary collapse
Instance Method Summary
collapse
#initialize, #retryable
Instance Attribute Details
#cursor_tracker ⇒ Object
Returns the value of attribute cursor_tracker.
126
127
128
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 126
def cursor_tracker
@cursor_tracker
end
|
Instance Method Details
#clear(pit_id) ⇒ Object
231
232
233
234
235
236
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 231
def clear(pit_id)
logger.info("Closing point in time (PIT)")
@client.close_point_in_time(:body => {:id => pit_id} ) if pit?(pit_id)
rescue => e
logger.debug("Ignoring close_point_in_time exception", message: e.message, exception: e.class)
end
|
#create_pit ⇒ Object
137
138
139
140
141
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 137
def create_pit
logger.info("Create point in time (PIT)")
r = @client.open_point_in_time(index: @index, keep_alive: @scroll)
r['id']
end
|
#do_run(output_queue, query) ⇒ Object
128
129
130
131
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 128
def do_run(output_queue, query)
super(output_queue, query)
@cursor_tracker.checkpoint_cursor(intermediate: false) if @cursor_tracker
end
|
#next_page(pit_id:, search_after: nil, slice_id: nil) ⇒ Object
165
166
167
168
169
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 165
def next_page(pit_id: , search_after: nil, slice_id: nil)
options = search_options(pit_id: pit_id, search_after: search_after, slice_id: slice_id)
logger.trace("search options", options)
@client.search(options)
end
|
#pit?(id) ⇒ Boolean
133
134
135
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 133
def pit?(id)
!!id&.is_a?(String)
end
|
#process_page(output_queue) ⇒ Object
171
172
173
174
175
176
177
178
179
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 171
def process_page(output_queue)
r = yield
r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) }
has_hits = r['hits']['hits'].any?
search_after = r['hits']['hits'][-1]['sort'] rescue nil
logger.warn("Query got data but the sort value is empty") if has_hits && search_after.nil?
[ has_hits, search_after ]
end
|
#retryable_search(output_queue) ⇒ Object
208
209
210
211
212
213
214
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 208
def retryable_search(output_queue)
with_pit do |pit_id|
retryable(SEARCH_AFTER_JOB) do
search(output_queue: output_queue, pit_id: pit_id)
end
end
end
|
#retryable_slice_search(output_queue) ⇒ Object
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 216
def retryable_slice_search(output_queue)
with_pit do |pit_id|
@slices.times.map do |slice_id|
Thread.new do
LogStash::Util::set_thread_name("[#{@pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
retryable(SEARCH_AFTER_JOB) do
search(output_queue: output_queue, slice_id: slice_id, pit_id: pit_id)
end
end
end.map(&:join)
end
logger.trace("#{@slices} slices completed")
end
|
#search(output_queue:, slice_id: nil, pit_id:) ⇒ Object
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 188
def search(output_queue:, slice_id: nil, pit_id:)
log_details = {}
log_details = log_details.merge({ slice_id: slice_id, slices: @slices }) unless slice_id.nil?
logger.info("Query start", log_details)
has_hits = true
search_after = nil
while has_hits && !@plugin.stop?
logger.debug("Query progress", log_details)
has_hits, search_after = process_page(output_queue) do
next_page(pit_id: pit_id, search_after: search_after, slice_id: slice_id)
end
end
@cursor_tracker.checkpoint_cursor(intermediate: true) if @cursor_tracker
logger.info("Query completed", log_details)
end
|
#search_options(pit_id:, search_after: nil, slice_id: nil) ⇒ Object
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 143
def search_options(pit_id: , search_after: nil, slice_id: nil)
body = @query.merge({
:pit => {
:id => pit_id,
:keep_alive => @scroll
}
})
body = body.merge(:sort => {"_shard_doc": "asc"}) if @query&.dig("sort").nil?
body = body.merge(:search_after => search_after) unless search_after.nil?
body = body.merge(:slice => {:id => slice_id, :max => @slices}) unless slice_id.nil?
{
:size => @size,
:body => body
}
end
|
#with_pit ⇒ Object
181
182
183
184
185
186
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 181
def with_pit
pit_id = retryable(PIT_JOB) { create_pit }
yield pit_id if pit?(pit_id)
ensure
clear(pit_id)
end
|