Class: Esse::Import::Bulk
- Inherits:
-
Object
- Object
- Esse::Import::Bulk
- Defined in:
- lib/esse/import/bulk.rb
Instance Method Summary collapse
-
#each_request(max_retries: 4, last_retry_in_small_chunks: true, last_retry_per_document: true) {|RequestBody| ... } ⇒ Object
Return an array of RequestBody instances.
-
#initialize(index: nil, delete: nil, create: nil, update: nil) ⇒ Bulk
constructor
A new instance of Bulk.
Constructor Details
#initialize(index: nil, delete: nil, create: nil, update: nil) ⇒ Bulk
Returns a new instance of Bulk.
4 5 6 7 8 9 |
# File 'lib/esse/import/bulk.rb', line 4 def initialize(index: nil, delete: nil, create: nil, update: nil) @index = Esse::ArrayUtils.wrap(index).map { |payload| { index: payload } } @create = Esse::ArrayUtils.wrap(create).map { |payload| { create: payload } } @update = Esse::ArrayUtils.wrap(update).map { |payload| { update: payload } } @delete = Esse::ArrayUtils.wrap(delete).map { |payload| { delete: payload } } end |
Instance Method Details
#each_request(max_retries: 4, last_retry_in_small_chunks: true, last_retry_per_document: true) {|RequestBody| ... } ⇒ Object
Return an array of RequestBody instances
In case of timeout error, will retry with an exponential backoff using the following formula:
wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 4.
Too large bulk requests will first be split into multiple size-balanced requests; if that still returns 413, the bulk is retried one document per request as a last resort. Only after a single document still returns 413 does the error bubble up.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/esse/import/bulk.rb', line 21 def each_request(max_retries: 4, last_retry_in_small_chunks: true, last_retry_per_document: true) # @TODO create indexes when by checking all the index suffixes (if mapping is not empty) requests = [optimistic_request] retry_count = 0 too_large_retry_count = 0 begin requests.each do |request| next unless request.body? resp = yield request raise Esse::Transport::BulkResponseError.new(resp) if resp&.[]('errors') end rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e retry_count += 1 raise Esse::Transport::RequestTimeoutError.new(e.) if retry_count >= max_retries # Timeout error may be caused by a too large request, so we split the requests in small chunks as a last attempt requests = requests_in_small_chunks if last_retry_in_small_chunks && max_retries > 2 && retry_count == max_retries - 2 wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) Esse.logger.warn "Timeout error, retrying in #{wait_interval} seconds" sleep(wait_interval) retry rescue Esse::Transport::RequestEntityTooLargeError => e too_large_retry_count += 1 raise e if too_large_retry_count > 2 if too_large_retry_count == 1 balanced = balance_requests_size(e) if balanced && !balanced.empty? requests = balanced Esse.logger.warn <<~MSG Request entity too large, retrying with a bulk with: #{requests.map(&:bytesize).join(' + ')}. Note that this cause performance degradation, consider adjusting the batch_size of the index or increasing the bulk size. MSG retry end raise e unless last_retry_per_document too_large_retry_count = 2 end raise e unless last_retry_per_document requests = requests_per_document Esse.logger.warn <<~MSG Request entity too large after balancing, retrying one document per request as a last resort. If a single document still exceeds the bulk size, the error will be raised. MSG retry end end |