Class: Esse::Import::Bulk

Inherits:
Object
  • Object
show all
Defined in:
lib/esse/import/bulk.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(index: nil, delete: nil, create: nil, update: nil) ⇒ Bulk

Returns a new instance of Bulk.



28
29
30
31
32
33
# File 'lib/esse/import/bulk.rb', line 28

def initialize(index: nil, delete: nil, create: nil, update: nil)
  @index = Esse::ArrayUtils.wrap(index).map { |payload| { index: payload } }
  @create = Esse::ArrayUtils.wrap(create).map { |payload| { create: payload } }
  @update = Esse::ArrayUtils.wrap(update).map { |payload| { update: payload } }
  @delete = Esse::ArrayUtils.wrap(delete).map { |payload| { delete: payload } }
end

Class Method Details

.build_from_documents(type: nil, index: nil, delete: nil, create: nil, update: nil) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/esse/import/bulk.rb', line 4

def self.build_from_documents(type: nil, index: nil, delete: nil, create: nil, update: nil)
  index = Array(index).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
    value = doc.to_bulk
    value[:_type] ||= type if type
    value
  end
  create = Array(create).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
    value = doc.to_bulk
    value[:_type] ||= type if type
    value
  end
  update = Array(update).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
    value = doc.to_bulk(operation: :update)
    value[:_type] ||= type if type
    value
  end
  delete = Array(delete).select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc|
    value = doc.to_bulk(data: false)
    value[:_type] ||= type if type
    value
  end
  new(index: index, delete: delete, create: create, update: update)
end

Instance Method Details

#each_request(max_retries: 4, last_retry_in_small_chunks: true) {|RequestBody| ... } ⇒ Object

Return an array of RequestBody instances

In case of timeout error, will retry with an exponential backoff using the following formula:

wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 4.

Too large bulk requests will be split into multiple requests with only one attempt.

Yields:



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/esse/import/bulk.rb', line 43

def each_request(max_retries: 4, last_retry_in_small_chunks: true)
  # @TODO create indexes when by checking all the index suffixes (if mapping is not empty)
  requests = [optimistic_request]
  retry_count = 0

  begin
    requests.each do |request|
      next unless request.body?
      resp = yield request
      if resp&.[]('errors')
        raise resp&.fetch('items', [])&.select { |item| item.values.first['error'] }&.join("\n")
      end
    end
  rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e
    retry_count += 1
    raise Esse::Transport::RequestTimeoutError.new(e.message) if retry_count >= max_retries
    # Timeout error may be caused by a too large request, so we split the requests in small chunks as a last attempt
    requests = requests_in_small_chunks if last_retry_in_small_chunks && max_retries > 2 && retry_count == max_retries - 2
    wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1))
    Esse.logger.warn "Timeout error, retrying in #{wait_interval} seconds"
    sleep(wait_interval)
    retry
  rescue Esse::Transport::RequestEntityTooLargeError => e
    retry_count += 1
    raise e if retry_count > 1 # only retry once on this error
    requests = balance_requests_size(e)
    Esse.logger.warn <<~MSG
      Request entity too large, retrying with a bulk with: #{requests.map(&:bytesize).join(' + ')}.
      Note that this cause performance degradation, consider adjusting the batch_size of the index or increasing the bulk size.
    MSG
    retry
  end
end