Class: Chewy::Index::Import::Routine
- Inherits:
-
Object
- Object
- Chewy::Index::Import::Routine
- Defined in:
- lib/chewy/index/import/routine.rb
Overview
This class performs the import routine for the options and objects given.
-
Create target and journal indexes if needed.
-
Iterate over all the passed objects in batches.
-
For each batch #process method is called:
* creates a bulk request body;
* appends journal entries for the current batch to the request body;
* prepends a leftovers bulk to the request body, which is calculated
basing on the previous iteration errors;
* performs the bulk request;
* composes new leftovers bulk for the next iteration basing on the response errors if `update_failover` is true;
* appends the rest of unfixable errors to the instance level errors array.
-
Perform the request for the last leftovers bulk if present using #extract_leftovers.
-
Return the result errors array.
At the moment, it tries to restore only from the partial document update errors in cases when the document doesn’t exist only if ‘update_failover` option is true. In order to restore, it indexes such an objects completely on the next iteration.
Constant Summary collapse
- BULK_OPTIONS =
%i[ suffix bulk_size refresh timeout fields pipeline consistency replication wait_for_active_shards routing _source _source_exclude _source_include ].freeze
- DEFAULT_OPTIONS =
{ refresh: true, update_fields: [], update_failover: true, batch_size: Chewy::Index::Adapter::Base::BATCH_SIZE }.freeze
Instance Attribute Summary collapse
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#leftovers ⇒ Object
readonly
Returns the value of attribute leftovers.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#parallel_options ⇒ Object
readonly
Returns the value of attribute parallel_options.
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
Instance Method Summary collapse
-
#create_indexes! ⇒ Object
Creates the journal index and the corresponding index if necessary.
-
#initialize(index, **options) ⇒ Routine
constructor
Basically, processes passed options, extracting bulk request specific options.
-
#perform_bulk(body) {|response| ... } ⇒ true, false
Performs a bulk request for the passed body.
-
#process(index: [], delete: []) ⇒ true, false
The main process method.
Constructor Details
#initialize(index, **options) ⇒ Routine
Basically, processes passed options, extracting bulk request specific options.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/chewy/index/import/routine.rb', line 44 def initialize(index, **) @index = index @options = @options.reverse_merge!(@index.) @options.reverse_merge!(journal: Chewy.configuration[:journal]) @options.reverse_merge!(DEFAULT_OPTIONS) @bulk_options = @options.slice(*BULK_OPTIONS) @parallel_options = @options.delete(:parallel) if @parallel_options && !@parallel_options.is_a?(Hash) @parallel_options = if @parallel_options.is_a?(Integer) {in_processes: @parallel_options} else {} end end @context = @options[:context] || {} @errors = [] @stats = {} @leftovers = [] end |
Instance Attribute Details
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
39 40 41 |
# File 'lib/chewy/index/import/routine.rb', line 39 def errors @errors end |
#leftovers ⇒ Object (readonly)
Returns the value of attribute leftovers.
39 40 41 |
# File 'lib/chewy/index/import/routine.rb', line 39 def leftovers @leftovers end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
39 40 41 |
# File 'lib/chewy/index/import/routine.rb', line 39 def @options end |
#parallel_options ⇒ Object (readonly)
Returns the value of attribute parallel_options.
39 40 41 |
# File 'lib/chewy/index/import/routine.rb', line 39 def @parallel_options end |
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
39 40 41 |
# File 'lib/chewy/index/import/routine.rb', line 39 def stats @stats end |
Instance Method Details
#create_indexes! ⇒ Object
Creates the journal index and the corresponding index if necessary.
67 68 69 70 71 72 |
# File 'lib/chewy/index/import/routine.rb', line 67 def create_indexes! Chewy::Stash::Journal.create if @options[:journal] && !Chewy.configuration[:skip_journal_creation_on_import] return if Chewy.configuration[:skip_index_creation_on_import] @index.create!(**@bulk_options.slice(:suffix)) unless @index.exists? end |
#perform_bulk(body) {|response| ... } ⇒ true, false
Performs a bulk request for the passed body.
103 104 105 106 107 108 109 |
# File 'lib/chewy/index/import/routine.rb', line 103 def perform_bulk(body) response = bulk.perform(body) yield response if block_given? Chewy.wait_for_status @errors.concat(response) response.blank? end |
#process(index: [], delete: []) ⇒ true, false
The main process method. Converts passed objects to the bulk request body, appends journal entries, performs this request and handles errors performing failover procedures if applicable.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/chewy/index/import/routine.rb', line 81 def process(index: [], delete: []) bulk_builder = BulkBuilder.new(@index, to_index: index, delete: delete, fields: @options[:update_fields], context: @context) bulk_body = bulk_builder.bulk_body if @options[:journal] journal_builder = JournalBuilder.new(@index, to_index: index, delete: delete) bulk_body.concat(journal_builder.bulk_body) end bulk_body.unshift(*flush_leftovers) perform_bulk(bulk_body) do |response| @leftovers = extract_leftovers(response, bulk_builder.index_objects_by_id) @stats[:index] = @stats[:index].to_i + index.count if index.present? @stats[:delete] = @stats[:delete].to_i + delete.count if delete.present? end end |