Class: Deimos::ActiveRecordConsume::MassUpdater

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/active_record_consume/mass_updater.rb

Overview

Responsible for updating the database itself.

Instance Method Summary collapse

Constructor Details

#initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false, bulk_import_id_column: nil) ⇒ MassUpdater

Returns a new instance of MassUpdater.

Parameters:

  • klass (Class < ActiveRecord::Base])

    lass [Class < ActiveRecord::Base]

  • key_col_proc (Proc<Class < ActiveRecord::Base>]) (defaults to: nil)

    ey_col_proc [Proc<Class < ActiveRecord::Base>]

  • col_proc (Proc<Class < ActiveRecord::Base>]) (defaults to: nil)

    ol_proc [Proc<Class < ActiveRecord::Base>]

  • replace_associations (Boolean) (defaults to: true)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 22

def initialize(klass, key_col_proc: nil, col_proc: nil,
               replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false,
               bulk_import_id_column: nil)
  @klass = klass
  @replace_associations = replace_associations
  @bulk_import_id_generator = bulk_import_id_generator
  @save_associations_first = save_associations_first
  @bulk_import_id_column = bulk_import_id_column&.to_s

  @key_cols = {}
  @key_col_proc = key_col_proc

  @columns = {}
  @col_proc = col_proc
end

Instance Method Details

#assign_associations(record_list) ⇒ Hash

Assign associated records to corresponding primary records

Parameters:

  • record_list (BatchRecordList)

    RecordList of primary records for this consumer

Returns:

  • (Hash)


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 93

def assign_associations(record_list)
  associations_info = {}
  record_list.associations.each do |assoc|
    col = @bulk_import_id_column if assoc.klass.column_names.include?(@bulk_import_id_column)
    associations_info[[assoc, col]] = []
  end
  record_list.batch_records.each do |primary_batch_record|
    associations_info.each_key do |assoc, col|
      batch_record = BatchRecord.new(klass: assoc.klass,
                                     attributes: primary_batch_record.associations[assoc.name],
                                     bulk_import_column: col,
                                     bulk_import_id_generator: @bulk_import_id_generator)
      # Associate this associated batch record's record with the primary record to
      # retrieve foreign_key after associated records have been saved and primary
      # keys have been filled
      primary_batch_record.record.assign_attributes({ assoc.name => batch_record.record })
      associations_info[[assoc, col]] << batch_record
    end
  end
  associations_info
end

#columns(klass) ⇒ Object

Parameters:

  • klass (Class < ActiveRecord::Base])

    lass [Class < ActiveRecord::Base]



39
40
41
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 39

def columns(klass)
  @columns[klass] ||= @col_proc&.call(klass) || self.default_cols(klass)
end

#default_cols(klass) ⇒ Object

Parameters:

  • klass (Class < ActiveRecord::Base])

    lass [Class < ActiveRecord::Base]



14
15
16
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 14

def default_cols(klass)
  klass.column_names - %w(created_at updated_at)
end

#default_keys(klass) ⇒ Object

Parameters:

  • klass (Class < ActiveRecord::Base])

    lass [Class < ActiveRecord::Base]



9
10
11
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 9

def default_keys(klass)
  [klass.primary_key]
end

#import_associations(record_list) ⇒ Object

Imports associated objects and import them to database table The base table is expected to contain bulk_import_id column for indexing associated objects with id

Parameters:



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 75

def import_associations(record_list)
  record_list.fill_primary_keys!

  import_id = @replace_associations ? @bulk_import_id_generator&.call : nil
  record_list.associations.each do |assoc|
    sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten
    next unless sub_records.any?

    sub_record_list = BatchRecordList.new(sub_records)

    save_records_to_database(sub_record_list)
    record_list.delete_old_records(assoc, import_id) if import_id
  end
end

#key_cols(klass) ⇒ Object

Parameters:

  • klass (Class < ActiveRecord::Base])

    lass [Class < ActiveRecord::Base]



44
45
46
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 44

def key_cols(klass)
  @key_cols[klass] ||= @key_col_proc&.call(klass) || self.default_keys(klass)
end

#mass_update(record_list) ⇒ Array<ActiveRecord::Base>

Parameters:

Returns:

  • (Array<ActiveRecord::Base>)


135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 135

def mass_update(record_list)
  # The entire batch should be treated as one transaction so that if
  # any message fails, the whole thing is rolled back or retried
  # if there is deadlock

  if @save_associations_first
    associations_info = assign_associations(record_list)
    save_associations_first(record_list, associations_info)
    Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
      save_records_to_database(record_list)
    end
  else
    Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
      save_records_to_database(record_list)
      import_associations(record_list) if record_list.associations.any?
    end
  end
  record_list.records
end

#save_associations_first(record_list, associations_info) ⇒ Object

Save associated records and fill foreign keys on RecordList records

Parameters:

  • record_list (BatchRecordList)

    RecordList of primary records for this consumer

  • associations_info (Hash)

    Contains association info



118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 118

def save_associations_first(record_list, associations_info)
  associations_info.each_value do |records|
    assoc_record_list = BatchRecordList.new(records)
    Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
      save_records_to_database(assoc_record_list)
    end
    import_associations(assoc_record_list)
  end
  record_list.records.each do |record|
    associations_info.each_key do |assoc, _|
      record.assign_attributes({ assoc.foreign_key => record.send(assoc.name).id })
    end
  end
end

#save_records_to_database(record_list) ⇒ Object

Parameters:



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 49

def save_records_to_database(record_list)
  columns = self.columns(record_list.klass)
  key_cols = self.key_cols(record_list.klass)
  record_list.records.each(&:validate!)

  options = if @key_cols.empty?
              {} # Can't upsert with no key, just do regular insert
            elsif ActiveRecord::Base.connection.adapter_name.downcase =~ /mysql/ ||
                  ActiveRecord::Base.connection.adapter_name.downcase =~ /trilogy/
              {
                on_duplicate_key_update: columns
              }
            else
              {
                on_duplicate_key_update: {
                  conflict_target: key_cols,
                  columns: columns
                }
              }
            end
  record_list.klass.import!(columns, record_list.records, options)
end