Class: Esse::Import::Bulk
- Inherits:
-
Object
- Object
- Esse::Import::Bulk
- Defined in:
- lib/esse/import/bulk.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#each_request(max_retries: 4, last_retry_in_small_chunks: true) {|RequestBody| ... } ⇒ Object
Return an array of RequestBody instances.
-
#initialize(index: nil, delete: nil, create: nil, update: nil) ⇒ Bulk
constructor
A new instance of Bulk.
Constructor Details
#initialize(index: nil, delete: nil, create: nil, update: nil) ⇒ Bulk
Returns a new instance of Bulk.
28 29 30 31 32 33 |
# File 'lib/esse/import/bulk.rb', line 28 def initialize(index: nil, delete: nil, create: nil, update: nil) @index = Esse::ArrayUtils.wrap(index).map { |payload| { index: payload } } @create = Esse::ArrayUtils.wrap(create).map { |payload| { create: payload } } @update = Esse::ArrayUtils.wrap(update).map { |payload| { update: payload } } @delete = Esse::ArrayUtils.wrap(delete).map { |payload| { delete: payload } } end |
Class Method Details
.build_from_documents(type: nil, index: nil, delete: nil, create: nil, update: nil) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/esse/import/bulk.rb', line 4 def self.build_from_documents(type: nil, index: nil, delete: nil, create: nil, update: nil) index = Array(index).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk value[:_type] ||= type if type value end create = Array(create).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk value[:_type] ||= type if type value end update = Array(update).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk(operation: :update) value[:_type] ||= type if type value end delete = Array(delete).select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc| value = doc.to_bulk(data: false) value[:_type] ||= type if type value end new(index: index, delete: delete, create: create, update: update) end |
Instance Method Details
#each_request(max_retries: 4, last_retry_in_small_chunks: true) {|RequestBody| ... } ⇒ Object
Return an array of RequestBody instances
In case of timeout error, will retry with an exponential backoff using the following formula:
wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 4.
Too large bulk requests will be split into multiple requests with only one attempt.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/esse/import/bulk.rb', line 43 def each_request(max_retries: 4, last_retry_in_small_chunks: true) # @TODO create indexes when by checking all the index suffixes (if mapping is not empty) requests = [optimistic_request] retry_count = 0 begin requests.each do |request| next unless request.body? resp = yield request if resp&.[]('errors') raise resp&.fetch('items', [])&.select { |item| item.values.first['error'] }&.join("\n") end end rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e retry_count += 1 raise Esse::Transport::RequestTimeoutError.new(e.) if retry_count >= max_retries # Timeout error may be caused by a too large request, so we split the requests in small chunks as a last attempt requests = requests_in_small_chunks if last_retry_in_small_chunks && max_retries > 2 && retry_count == max_retries - 2 wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) Esse.logger.warn "Timeout error, retrying in #{wait_interval} seconds" sleep(wait_interval) retry rescue Esse::Transport::RequestEntityTooLargeError => e retry_count += 1 raise e if retry_count > 1 # only retry once on this error requests = balance_requests_size(e) Esse.logger.warn <<~MSG Request entity too large, retrying with a bulk with: #{requests.map(&:bytesize).join(' + ')}. Note that this cause performance degradation, consider adjusting the batch_size of the index or increasing the bulk size. MSG retry end end |