Module: Chewy::RakeHelper
- Defined in:
- lib/chewy/rake_helper.rb
Constant Summary collapse
- IMPORT_CALLBACK =
- lambda do |output, _name, start, finish, _id, payload| duration = (finish - start).ceil stats = payload.fetch(:import, {}).map { |key, count| "#{key} #{count}" }.join(', ') output.puts " Imported #{payload[:index]} in #{human_duration(duration)}, stats: #{stats}" payload[:errors]&.each do |action, errors| output.puts " #{action.to_s.humanize} errors:" errors.each do |error, documents| output.puts " `#{error}`" output.puts " on #{documents.count} documents: #{documents}" end end end 
- JOURNAL_CALLBACK =
- lambda do |output, _, _, _, _, payload| count = payload[:groups].values.map(&:size).sum targets = payload[:groups].keys.sort_by(&:derivable_name) output.puts " Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}" end 
- DELETE_BY_QUERY_OPTIONS =
- %w[WAIT_FOR_COMPLETION REQUESTS_PER_SECOND SCROLL_SIZE].freeze 
- FALSE_VALUES =
- %w[0 f false off].freeze 
Class Method Summary collapse
- 
  
    
      .all_indexes  ⇒ Array<Chewy::Index> 
    
    
  
  
  
  
  
  
  
  
  
    Eager loads and returns all the indexes defined in the application except Chewy::Stash::Specification and Chewy::Stash::Journal. 
- .create_missing_indexes!(output: $stdout, env: ENV) ⇒ Object
- 
  
    
      .delete_by_query_options_from_env(env)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Reads options that are required to run journal cleanup asynchronously from ENV hash. 
- 
  
    
      .journal_apply(time: nil, only: nil, except: nil, output: $stdout)  ⇒ Array<Chewy::Index> 
    
    
  
  
  
  
  
  
  
  
  
    Applies changes that were done after the specified time for the specified indexes or all of them. 
- 
  
    
      .journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout)  ⇒ Array<Chewy::Index> 
    
    
  
  
  
  
  
  
  
  
  
    Removes journal records created before the specified timestamp for the specified indexes or all of them. 
- 
  
    
      .journal_create(output: $stdout)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Creates journal index. 
- .normalize_index(identifier) ⇒ Object
- .normalize_indexes(*identifiers) ⇒ Object
- 
  
    
      .reindex(source:, dest:, output: $stdout)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Reindex data from source index to destination index. 
- 
  
    
      .reset(only: nil, except: nil, parallel: nil, output: $stdout)  ⇒ Array<Chewy::Index> 
    
    
  
  
  
  
  
  
  
  
  
    Performs zero-downtime reindexing of all documents for the specified indexes. 
- .subscribed_task_stats(output = $stdout, &block) ⇒ Object
- 
  
    
      .sync(only: nil, except: nil, parallel: nil, output: $stdout)  ⇒ Array<Chewy::Index> 
    
    
  
  
  
  
  
  
  
  
  
    Performs synchronization for each passed index if it exists. 
- 
  
    
      .update(only: nil, except: nil, parallel: nil, output: $stdout)  ⇒ Array<Chewy::Index> 
    
    
  
  
  
  
  
  
  
  
  
    Performs full update for each passed type if the corresponding index exists. 
- 
  
    
      .update_mapping(name:, output: $stdout)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Adds new fields to an existing data stream or index. 
- 
  
    
      .upgrade(only: nil, except: nil, parallel: nil, output: $stdout)  ⇒ Array<Chewy::Index> 
    
    
  
  
  
  
  
  
  
  
  
    Performs zero-downtime reindexing of all documents for the specified indexes only if a particular index specification was changed. 
Class Method Details
.all_indexes ⇒ Array<Chewy::Index>
Eager loads and returns all the indexes defined in the application except Chewy::Stash::Specification and Chewy::Stash::Journal.
| 217 218 219 220 | # File 'lib/chewy/rake_helper.rb', line 217 def all_indexes Chewy.eager_load! Chewy::Index.descendants - [Chewy::Stash::Journal, Chewy::Stash::Specification] end | 
.create_missing_indexes!(output: $stdout, env: ENV) ⇒ Object
| 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | # File 'lib/chewy/rake_helper.rb', line 271 def create_missing_indexes!(output: $stdout, env: ENV) subscribed_task_stats(output) do Chewy.eager_load! all_indexes = Chewy::Index.descendants all_indexes -= [Chewy::Stash::Journal] unless Chewy.configuration[:journal] all_indexes.each do |index| if index.exists? output.puts "#{index.name} already exists, skipping" if env['VERBOSE'] next end index.create! output.puts "#{index.name} index successfully created" end end end | 
.delete_by_query_options_from_env(env) ⇒ Object
Reads options that are required to run journal cleanup asynchronously from ENV hash
| 258 259 260 261 262 263 264 265 266 267 268 269 | # File 'lib/chewy/rake_helper.rb', line 258 def (env) env .slice(*DELETE_BY_QUERY_OPTIONS) .transform_keys { |k| k.downcase.to_sym } .to_h do |key, value| case key when :wait_for_completion then [key, !FALSE_VALUES.include?(value.downcase)] when :requests_per_second then [key, value.to_f] when :scroll_size then [key, value.to_i] end end end | 
.journal_apply(time: nil, only: nil, except: nil, output: $stdout) ⇒ Array<Chewy::Index>
Applies changes that were done after the specified time for the specified indexes or all of them.
| 163 164 165 166 167 168 169 170 171 | # File 'lib/chewy/rake_helper.rb', line 163 def journal_apply(time: nil, only: nil, except: nil, output: $stdout) raise ArgumentError, 'Please specify the time to start with' unless time subscribed_task_stats(output) do output.puts "Applying journal entries created after #{time}" count = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).apply(time) output.puts 'No journal entries were created after the specified time' if count.zero? end end | 
.journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout) ⇒ Array<Chewy::Index>
Removes journal records created before the specified timestamp for the specified indexes or all of them.
| 187 188 189 190 191 192 193 194 195 196 197 198 | # File 'lib/chewy/rake_helper.rb', line 187 def journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout) subscribed_task_stats(output) do output.puts "Cleaning journal entries created before #{time}" if time response = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).clean(time, delete_by_query_options: ) if response.key?('task') output.puts "Task to cleanup the journal has been created, #{response['task']}" else count = response['deleted'] || response['_indices']['_all']['deleted'] output.puts "Cleaned up #{count} journal entries" end end end | 
.journal_create(output: $stdout) ⇒ Object
Creates journal index.
| 207 208 209 210 211 | # File 'lib/chewy/rake_helper.rb', line 207 def journal_create(output: $stdout) subscribed_task_stats(output) do Chewy::Stash::Journal.create! end end | 
.normalize_index(identifier) ⇒ Object
| 293 294 295 296 297 | # File 'lib/chewy/rake_helper.rb', line 293 def normalize_index(identifier) return identifier if identifier.is_a?(Class) && identifier < Chewy::Index "#{identifier.to_s.camelize}Index".constantize end | 
.normalize_indexes(*identifiers) ⇒ Object
| 289 290 291 | # File 'lib/chewy/rake_helper.rb', line 289 def normalize_indexes(*identifiers) identifiers.flatten(1).map { |identifier| normalize_index(identifier) } end | 
.reindex(source:, dest:, output: $stdout) ⇒ Object
Reindex data from source index to destination index
| 228 229 230 231 232 233 234 | # File 'lib/chewy/rake_helper.rb', line 228 def reindex(source:, dest:, output: $stdout) subscribed_task_stats(output) do output.puts "Source index is #{source}\nDestination index is #{dest}" Chewy::Index.reindex(source: source, dest: dest) output.puts "#{source} index successfully reindexed with #{dest} index data" end end | 
.reset(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes
| 40 41 42 43 44 45 46 47 48 | # File 'lib/chewy/rake_helper.rb', line 40 def reset(only: nil, except: nil, parallel: nil, output: $stdout) warn_missing_index(output) subscribed_task_stats(output) do indexes_from(only: only, except: except).each do |index| reset_one(index, output, parallel: parallel) end end end | 
.subscribed_task_stats(output = $stdout, &block) ⇒ Object
| 299 300 301 302 303 304 305 306 | # File 'lib/chewy/rake_helper.rb', line 299 def subscribed_task_stats(output = $stdout, &block) start = Time.now ActiveSupport::Notifications.subscribed(JOURNAL_CALLBACK.curry[output], 'apply_journal.chewy') do ActiveSupport::Notifications.subscribed(IMPORT_CALLBACK.curry[output], 'import_objects.chewy', &block) end ensure output.puts "Total: #{human_duration(Time.now - start)}" end | 
.sync(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs synchronization for each passed index if it exists.
| 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | # File 'lib/chewy/rake_helper.rb', line 129 def sync(only: nil, except: nil, parallel: nil, output: $stdout) subscribed_task_stats(output) do indexes_from(only: only, except: except).each_with_object([]) do |index, synced_indexes| output.puts "Synchronizing #{index}" output.puts " #{index} doesn't support outdated synchronization" unless index.supports_outdated_sync? time = Time.now sync_result = index.sync(parallel: parallel) if !sync_result output.puts " Something went wrong with the #{index} synchronization" elsif (sync_result[:count]).positive? output.puts " Missing documents: #{sync_result[:missing]}" if sync_result[:missing].present? output.puts " Outdated documents: #{sync_result[:outdated]}" if sync_result[:outdated].present? synced_indexes.push(index) else output.puts " Skipping #{index}, up to date" end output.puts " Took #{human_duration(Time.now - time)}" end end end | 
.update(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs full update for each passed type if the corresponding index exists.
| 103 104 105 106 107 108 109 110 111 112 113 114 115 | # File 'lib/chewy/rake_helper.rb', line 103 def update(only: nil, except: nil, parallel: nil, output: $stdout) subscribed_task_stats(output) do indexes_from(only: only, except: except).each_with_object([]) do |index, updated_indexes| if index.exists? output.puts "Updating #{index}" index.import(parallel: parallel) updated_indexes.push(index) else output.puts "Skipping #{index}, it does not exists (use rake chewy:reset[#{index.derivable_name}] to create and update it)" end end end end | 
.update_mapping(name:, output: $stdout) ⇒ Object
Adds new fields to an existing data stream or index. Change the search settings of existing fields.
| 243 244 245 246 247 248 249 | # File 'lib/chewy/rake_helper.rb', line 243 def update_mapping(name:, output: $stdout) subscribed_task_stats(output) do output.puts "Index name is #{name}" normalize_index(name).update_mapping output.puts "#{name} index successfully updated" end end | 
.upgrade(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes only if a particular index specification was changed.
| 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | # File 'lib/chewy/rake_helper.rb', line 65 def upgrade(only: nil, except: nil, parallel: nil, output: $stdout) warn_missing_index(output) subscribed_task_stats(output) do indexes = indexes_from(only: only, except: except) changed_indexes = indexes.select do |index| index.specification.changed? end if changed_indexes.present? indexes.each do |index| if changed_indexes.include?(index) reset_one(index, output, parallel: parallel) else output.puts "Skipping #{index}, the specification didn't change" end end else output.puts 'No index specification was changed' end changed_indexes end end |