Class: Chewy::Index::Import::Routine
- Inherits:
- 
      Object
      
        - Object
- Chewy::Index::Import::Routine
 
- Defined in:
- lib/chewy/index/import/routine.rb
Overview
This class performs the import routine for the options and objects given.
- Create target and journal indexes if needed.
- Iterate over all the passed objects in batches.
- For each batch #process method is called:
- creates a bulk request body;
- appends journal entries for the current batch to the request body;
- prepends a leftovers bulk to the request body, which is calculated basing on the previous iteration errors;
- performs the bulk request;
- composes new leftovers bulk for the next iteration basing on the response errors if update_failoveris true;
- appends the rest of unfixable errors to the instance level errors array.
 
- Perform the request for the last leftovers bulk if present using #extract_leftovers.
- Return the result errors array.
At the moment, it tries to restore only from the partial document update errors in cases
when the document doesn't exist only if update_failover option is true. In order to
restore, it indexes such an objects completely on the next iteration.
Constant Summary collapse
- BULK_OPTIONS =
- %i[ suffix bulk_size refresh timeout fields pipeline consistency replication wait_for_active_shards routing _source _source_exclude _source_include ].freeze 
- DEFAULT_OPTIONS =
- { refresh: true, update_fields: [], update_failover: true, batch_size: Chewy::Index::Adapter::Base::BATCH_SIZE }.freeze 
Instance Attribute Summary collapse
- 
  
    
      #errors  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute errors. 
- 
  
    
      #leftovers  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute leftovers. 
- 
  
    
      #options  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute options. 
- 
  
    
      #parallel_options  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute parallel_options. 
- 
  
    
      #stats  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute stats. 
Instance Method Summary collapse
- 
  
    
      #create_indexes!  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Creates the journal index and the corresponding index if necessary. 
- 
  
    
      #initialize(index, **options)  ⇒ Routine 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    Basically, processes passed options, extracting bulk request specific options. 
- 
  
    
      #perform_bulk(body) {|response| ... } ⇒ true, false 
    
    
  
  
  
  
  
  
  
  
  
    Performs a bulk request for the passed body. 
- 
  
    
      #process(index: [], delete: [])  ⇒ true, false 
    
    
  
  
  
  
  
  
  
  
  
    The main process method. 
Constructor Details
#initialize(index, **options) ⇒ Routine
Basically, processes passed options, extracting bulk request specific options.
| 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | # File 'lib/chewy/index/import/routine.rb', line 44 def initialize(index, **) @index = index @options = @options.reverse_merge!(@index.) @options.reverse_merge!(journal: Chewy.configuration[:journal]) @options.reverse_merge!(DEFAULT_OPTIONS) @bulk_options = @options.slice(*BULK_OPTIONS) @parallel_options = @options.delete(:parallel) if @parallel_options && !@parallel_options.is_a?(Hash) @parallel_options = if @parallel_options.is_a?(Integer) {in_processes: @parallel_options} else {} end end @errors = [] @stats = {} @leftovers = [] end | 
Instance Attribute Details
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
| 39 40 41 | # File 'lib/chewy/index/import/routine.rb', line 39 def errors @errors end | 
#leftovers ⇒ Object (readonly)
Returns the value of attribute leftovers.
| 39 40 41 | # File 'lib/chewy/index/import/routine.rb', line 39 def leftovers @leftovers end | 
#options ⇒ Object (readonly)
Returns the value of attribute options.
| 39 40 41 | # File 'lib/chewy/index/import/routine.rb', line 39 def @options end | 
#parallel_options ⇒ Object (readonly)
Returns the value of attribute parallel_options.
| 39 40 41 | # File 'lib/chewy/index/import/routine.rb', line 39 def @parallel_options end | 
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
| 39 40 41 | # File 'lib/chewy/index/import/routine.rb', line 39 def stats @stats end | 
Instance Method Details
#create_indexes! ⇒ Object
Creates the journal index and the corresponding index if necessary.
| 66 67 68 69 70 71 | # File 'lib/chewy/index/import/routine.rb', line 66 def create_indexes! Chewy::Stash::Journal.create if @options[:journal] && !Chewy.configuration[:skip_journal_creation_on_import] return if Chewy.configuration[:skip_index_creation_on_import] @index.create!(**@bulk_options.slice(:suffix)) unless @index.exists? end | 
#perform_bulk(body) {|response| ... } ⇒ true, false
Performs a bulk request for the passed body.
| 102 103 104 105 106 107 108 | # File 'lib/chewy/index/import/routine.rb', line 102 def perform_bulk(body) response = bulk.perform(body) yield response if block_given? Chewy.wait_for_status @errors.concat(response) response.blank? end | 
#process(index: [], delete: []) ⇒ true, false
The main process method. Converts passed objects to the bulk request body, appends journal entries, performs this request and handles errors performing failover procedures if applicable.
| 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | # File 'lib/chewy/index/import/routine.rb', line 80 def process(index: [], delete: []) bulk_builder = BulkBuilder.new(@index, to_index: index, delete: delete, fields: @options[:update_fields]) bulk_body = bulk_builder.bulk_body if @options[:journal] journal_builder = JournalBuilder.new(@index, to_index: index, delete: delete) bulk_body.concat(journal_builder.bulk_body) end bulk_body.unshift(*flush_leftovers) perform_bulk(bulk_body) do |response| @leftovers = extract_leftovers(response, bulk_builder.index_objects_by_id) @stats[:index] = @stats[:index].to_i + index.count if index.present? @stats[:delete] = @stats[:delete].to_i + delete.count if delete.present? end end |