Class: Zelastic::IndexManager

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/zelastic/index_manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(config, client: nil) ⇒ IndexManager

Returns a new instance of IndexManager.



7
8
9
10
# File 'lib/zelastic/index_manager.rb', line 7

def initialize(config, client: nil)
  @config = config
  @client = client || config.clients.first
end

Instance Method Details

#cleanup_old_indicesObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/zelastic/index_manager.rb', line 72

def cleanup_old_indices
  logger.info('Cleaning up old indices in Elasticsearch')
  current_index = client.indices.get_alias(name: config.read_alias).keys.first

  logger.info("Currently used index is #{current_index}")

  indices_to_delete = client
    .cat
    .indices(format: :json)
    .map { |index| index['index'] }
    .select { |name| name.start_with?(config.read_alias) }
    .reject { |name| name == current_index }

  if indices_to_delete.none?
    logger.info('Nothing to do: no old indices')
    return
  end
  logger.info(
    "Deleting #{indices_to_delete.count} old indices: #{indices_to_delete.join(', ')}"
  )
  client.indices.delete(index: indices_to_delete)
end

#create_index(unique_name) ⇒ Object



12
13
14
15
16
17
# File 'lib/zelastic/index_manager.rb', line 12

def create_index(unique_name)
  index_name = index_name_from_unique(unique_name)

  client.indices.create(index: index_name, body: config.index_definition)
  client.indices.put_alias(index: index_name, name: config.write_alias)
end

#current_read_indexObject



130
131
132
# File 'lib/zelastic/index_manager.rb', line 130

def current_read_index
  client.indices.get_alias(name: config.read_alias).keys.first
end

#current_write_indexObject



134
135
136
# File 'lib/zelastic/index_manager.rb', line 134

def current_write_index
  client.indices.get_alias(name: config.write_alias).keys.first
end

#populate_index(unique_name = nil, batch_size: 3000, refresh: false) ⇒ Object



19
20
21
22
23
24
25
26
# File 'lib/zelastic/index_manager.rb', line 19

def populate_index(unique_name = nil, batch_size: 3000, refresh: false)
  index_name = index_name_from_unique(unique_name)

  config.data_source.find_in_batches(batch_size:).with_index do |batch, i|
    logger.info(populate_index_log(batch_size:, batch_number: i + 1))
    indexer.index_batch(batch, client:, index_name:, refresh:)
  end
end

#reindex_from_local(source_index:, dest_index:, wait_for_completion: false, op_type: nil, conflicts: nil) ⇒ Object



95
96
97
98
99
100
101
102
103
# File 'lib/zelastic/index_manager.rb', line 95

def reindex_from_local(source_index:, dest_index:, wait_for_completion: false, op_type: nil, conflicts: nil)
  logger.info("Reindexing from #{source_index} to #{dest_index}")
  reindex(
    source: { index: source_index }, dest_index:,
    wait_for_completion:,
    op_type:,
    conflicts:
  )
end

#reindex_from_remote(source_host:, source_index:, dest_index:, username: nil, password: nil, wait_for_completion: false, op_type: nil, conflicts: nil) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/zelastic/index_manager.rb', line 105

def reindex_from_remote(
  source_host:,
  source_index:,
  dest_index:,
  username: nil,
  password: nil,
  wait_for_completion: false,
  op_type: nil,
  conflicts: nil
)
  logger.info("Reindexing from remote #{source_host}/#{source_index} to #{dest_index}")

  remote = { host: source_host }
  remote[:username] = username if username
  remote[:password] = password if password

  reindex(
    source: { remote:, index: source_index },
    dest_index:,
    wait_for_completion:,
    op_type:,
    conflicts:
  )
end

#stop_dual_writesObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/zelastic/index_manager.rb', line 49

def stop_dual_writes
  logger.info('Stopping dual writes - making index read and write aliases the same')
  current_index = client.indices.get_alias(name: config.read_alias).keys.first

  logger.info("Currently used index is #{current_index}")

  other_write_indices = client.indices.get_alias(name: config.write_alias).keys
    .reject { |name| name == current_index }

  if other_write_indices.none?
    logger.info("No write indexes that aren't the read index. Nothing to do!")
    return
  end
  logger.info("Stopping writes to #{other_write_indices.count} old ES indices: " \
              "#{other_write_indices.join(', ')}"
             )

  actions = other_write_indices.map do |index|
    { remove: { index:, alias: config.write_alias } }
  end
  client.indices.update_aliases(body: { actions: })
end

#switch_read_index(new_name) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/zelastic/index_manager.rb', line 28

def switch_read_index(new_name)
  new_index = [config.read_alias, new_name].join('_')

  old_index =
    if client.indices.exists_alias?(name: config.read_alias)
      client.indices.get_alias(name: config.read_alias).keys.first
    end

  remove_action =
    ({ remove: { index: old_index, alias: config.read_alias } } if old_index)

  client.indices.update_aliases(
    body: {
      actions: [
        remove_action,
        { add: { index: new_index, alias: config.read_alias } }
      ].compact
    }
  )
end