Class: Chewy::Index::Syncer
- Inherits:
- 
      Object
      
        - Object
- Chewy::Index::Syncer
 
- Defined in:
- lib/chewy/index/syncer.rb
Overview
In rails 4.0 time converted to json with the precision of seconds without milliseconds used, so outdated check is not so precise there.
ATTENTION: synchronization may be slow in case when synchronized tables
are missing compound index on primary key and outdated_sync_field.
This class is able to find missing and outdated documents in the ES
comparing ids from the data source and the ES index. Also, if outdated_sync_field
exists in the index definition, it performs comparison of this field
values for each source object and corresponding ES document. Usually,
this field is updated_at and if its value in the source is not equal
to the value in the index - this means that this document outdated and
should be reindexed.
To fetch necessary data from the source it uses adapter method Adapter::Base#import_fields, in case when the Object adapter is used it makes sense to read corresponding documentation.
If parallel option is passed to the initializer - it will fetch source and
index data in parallel and then perform outdated objects calculation in
parallel processes. Also, further import (if required) will be performed
in parallel as well.
Constant Summary collapse
- DEFAULT_SYNC_BATCH_SIZE =
- 20_000
- ISO_DATETIME =
- /\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/
- OUTDATED_IDS_WORKER =
- lambda do |outdated_sync_field_type, source_data_hash, index, total, index_data| ::Process.setproctitle("chewy [#{index}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if index index_data.each_with_object([]) do |(id, index_sync_value), result| next unless source_data_hash[id] outdated = if outdated_sync_field_type == 'date' !Chewy::Index::Syncer.dates_equal(typecast_date(source_data_hash[id]), Time.iso8601(index_sync_value)) else source_data_hash[id] != index_sync_value end result.push(id) if outdated end end 
- SOURCE_OR_INDEX_DATA_WORKER =
- lambda do |syncer, index, kind| ::Process.setproctitle("chewy [#{index}]: sync fetching data (#{kind})") result = case kind when :source syncer.send(:fetch_source_data) when :index syncer.send(:fetch_index_data) end {kind => result} end 
Class Method Summary collapse
- 
  
    
      .dates_equal(one, two)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Compares times with ms precision. 
- .typecast_date(string) ⇒ Object
Instance Method Summary collapse
- 
  
    
      #initialize(index, parallel: nil)  ⇒ Syncer 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of Syncer. 
- 
  
    
      #missing_ids  ⇒ Array<String> 
    
    
  
  
  
  
  
  
  
  
  
    Finds ids of all the objects that are not indexed yet or deleted from the source already. 
- 
  
    
      #outdated_ids  ⇒ Array<String> 
    
    
  
  
  
  
  
  
  
  
  
    If index supports outdated sync, it compares the values of the outdated_sync_fieldfor each object and document in the source and index and returns the ids of entities which differ.
- 
  
    
      #perform  ⇒ Integer? 
    
    
  
  
  
  
  
  
  
  
  
    Finds all the missing and outdated ids and performs import for them. 
Constructor Details
#initialize(index, parallel: nil) ⇒ Syncer
Returns a new instance of Syncer.
| 76 77 78 79 80 81 82 83 84 85 | # File 'lib/chewy/index/syncer.rb', line 76 def initialize(index, parallel: nil) @index = index @parallel = if !parallel || parallel.is_a?(Hash) parallel elsif parallel.is_a?(Integer) {in_processes: parallel} else {} end end | 
Class Method Details
.dates_equal(one, two) ⇒ Object
Compares times with ms precision.
| 70 71 72 | # File 'lib/chewy/index/syncer.rb', line 70 def self.dates_equal(one, two) [one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000] end | 
.typecast_date(string) ⇒ Object
| 56 57 58 59 60 61 62 63 64 65 66 67 | # File 'lib/chewy/index/syncer.rb', line 56 def self.typecast_date(string) if string.is_a?(String) && (match = ISO_DATETIME.match(string)) microsec = (match[7].to_r * 1_000_000).to_i day = "#{match[1]}-#{match[2]}-#{match[3]}" time_with_seconds = "#{match[4]}:#{match[5]}:#{match[6]}" microseconds = format('%06d', microsec) date = "#{day}T#{time_with_seconds}.#{microseconds}+00:00" Time.iso8601(date) else string end end | 
Instance Method Details
#missing_ids ⇒ Array<String>
Finds ids of all the objects that are not indexed yet or deleted from the source already.
| 101 102 103 104 105 106 107 108 109 110 | # File 'lib/chewy/index/syncer.rb', line 101 def missing_ids return [] if source_data.blank? @missing_ids ||= begin source_data_ids = data_ids(source_data) index_data_ids = data_ids(index_data) (source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids) end end | 
#outdated_ids ⇒ Array<String>
If index supports outdated sync, it compares the values of the
outdated_sync_field for each object and document in the source
and index and returns the ids of entities which differ.
| 118 119 120 121 122 123 124 125 126 | # File 'lib/chewy/index/syncer.rb', line 118 def outdated_ids return [] if source_data.blank? || index_data.blank? || !@index.supports_outdated_sync? @outdated_ids ||= if @parallel parallel_outdated_ids else linear_outdated_ids end end | 
#perform ⇒ Integer?
Finds all the missing and outdated ids and performs import for them.
| 90 91 92 93 94 95 | # File 'lib/chewy/index/syncer.rb', line 90 def perform ids = missing_ids | outdated_ids return 0 if ids.blank? @index.import(ids, parallel: @parallel) && ids.count end |