Class: Zelastic::IndexManager
- Inherits:
-
Object
- Object
- Zelastic::IndexManager
- Extended by:
- Forwardable
- Defined in:
- lib/zelastic/index_manager.rb
Instance Method Summary collapse
- #cleanup_old_indices ⇒ Object
- #create_index(unique_name) ⇒ Object
- #current_read_index ⇒ Object
- #current_write_index ⇒ Object
-
#initialize(config, client: nil) ⇒ IndexManager
constructor
A new instance of IndexManager.
- #populate_index(unique_name = nil, batch_size: 3000, refresh: false) ⇒ Object
- #reindex_from_local(source_index:, dest_index:, wait_for_completion: false, op_type: nil, conflicts: nil) ⇒ Object
- #reindex_from_remote(source_host:, source_index:, dest_index:, username: nil, password: nil, wait_for_completion: false, op_type: nil, conflicts: nil) ⇒ Object
- #stop_dual_writes ⇒ Object
- #switch_read_index(new_name) ⇒ Object
Constructor Details
#initialize(config, client: nil) ⇒ IndexManager
Returns a new instance of IndexManager.
7 8 9 10 |
# File 'lib/zelastic/index_manager.rb', line 7 def initialize(config, client: nil) @config = config @client = client || config.clients.first end |
Instance Method Details
#cleanup_old_indices ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/zelastic/index_manager.rb', line 72 def cleanup_old_indices logger.info('Cleaning up old indices in Elasticsearch') current_index = client.indices.get_alias(name: config.read_alias).keys.first logger.info("Currently used index is #{current_index}") indices_to_delete = client .cat .indices(format: :json) .map { |index| index['index'] } .select { |name| name.start_with?(config.read_alias) } .reject { |name| name == current_index } if indices_to_delete.none? logger.info('Nothing to do: no old indices') return end logger.info( "Deleting #{indices_to_delete.count} old indices: #{indices_to_delete.join(', ')}" ) client.indices.delete(index: indices_to_delete) end |
#create_index(unique_name) ⇒ Object
12 13 14 15 16 17 |
# File 'lib/zelastic/index_manager.rb', line 12 def create_index(unique_name) index_name = index_name_from_unique(unique_name) client.indices.create(index: index_name, body: config.index_definition) client.indices.put_alias(index: index_name, name: config.write_alias) end |
#current_read_index ⇒ Object
130 131 132 |
# File 'lib/zelastic/index_manager.rb', line 130 def current_read_index client.indices.get_alias(name: config.read_alias).keys.first end |
#current_write_index ⇒ Object
134 135 136 |
# File 'lib/zelastic/index_manager.rb', line 134 def current_write_index client.indices.get_alias(name: config.write_alias).keys.first end |
#populate_index(unique_name = nil, batch_size: 3000, refresh: false) ⇒ Object
19 20 21 22 23 24 25 26 |
# File 'lib/zelastic/index_manager.rb', line 19 def populate_index(unique_name = nil, batch_size: 3000, refresh: false) index_name = index_name_from_unique(unique_name) config.data_source.find_in_batches(batch_size:).with_index do |batch, i| logger.info(populate_index_log(batch_size:, batch_number: i + 1)) indexer.index_batch(batch, client:, index_name:, refresh:) end end |
#reindex_from_local(source_index:, dest_index:, wait_for_completion: false, op_type: nil, conflicts: nil) ⇒ Object
95 96 97 98 99 100 101 102 103 |
# File 'lib/zelastic/index_manager.rb', line 95 def reindex_from_local(source_index:, dest_index:, wait_for_completion: false, op_type: nil, conflicts: nil) logger.info("Reindexing from #{source_index} to #{dest_index}") reindex( source: { index: source_index }, dest_index:, wait_for_completion:, op_type:, conflicts: ) end |
#reindex_from_remote(source_host:, source_index:, dest_index:, username: nil, password: nil, wait_for_completion: false, op_type: nil, conflicts: nil) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/zelastic/index_manager.rb', line 105 def reindex_from_remote( source_host:, source_index:, dest_index:, username: nil, password: nil, wait_for_completion: false, op_type: nil, conflicts: nil ) logger.info("Reindexing from remote #{source_host}/#{source_index} to #{dest_index}") remote = { host: source_host } remote[:username] = username if username remote[:password] = password if password reindex( source: { remote:, index: source_index }, dest_index:, wait_for_completion:, op_type:, conflicts: ) end |
#stop_dual_writes ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/zelastic/index_manager.rb', line 49 def stop_dual_writes logger.info('Stopping dual writes - making index read and write aliases the same') current_index = client.indices.get_alias(name: config.read_alias).keys.first logger.info("Currently used index is #{current_index}") other_write_indices = client.indices.get_alias(name: config.write_alias).keys .reject { |name| name == current_index } if other_write_indices.none? logger.info("No write indexes that aren't the read index. Nothing to do!") return end logger.info("Stopping writes to #{other_write_indices.count} old ES indices: " \ "#{other_write_indices.join(', ')}" ) actions = other_write_indices.map do |index| { remove: { index:, alias: config.write_alias } } end client.indices.update_aliases(body: { actions: }) end |
#switch_read_index(new_name) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/zelastic/index_manager.rb', line 28 def switch_read_index(new_name) new_index = [config.read_alias, new_name].join('_') old_index = if client.indices.exists_alias?(name: config.read_alias) client.indices.get_alias(name: config.read_alias).keys.first end remove_action = ({ remove: { index: old_index, alias: config.read_alias } } if old_index) client.indices.update_aliases( body: { actions: [ remove_action, { add: { index: new_index, alias: config.read_alias } } ].compact } ) end |