Class: Deimos::ActiveRecordConsume::MassUpdater
- Inherits:
-
Object
- Object
- Deimos::ActiveRecordConsume::MassUpdater
- Defined in:
- lib/deimos/active_record_consume/mass_updater.rb
Overview
Responsible for updating the database itself.
Instance Method Summary collapse
-
#assign_associations(record_list) ⇒ Hash
Assign associated records to corresponding primary records.
- #columns(klass) ⇒ Object
- #default_cols(klass) ⇒ Object
- #default_keys(klass) ⇒ Object
-
#import_associations(record_list) ⇒ Object
Imports associated objects and import them to database table The base table is expected to contain bulk_import_id column for indexing associated objects with id.
-
#initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false, bulk_import_id_column: nil) ⇒ MassUpdater
constructor
A new instance of MassUpdater.
- #key_cols(klass) ⇒ Object
- #mass_update(record_list) ⇒ Array<ActiveRecord::Base>
-
#save_associations_first(record_list, associations_info) ⇒ Object
Save associated records and fill foreign keys on RecordList records.
- #save_records_to_database(record_list) ⇒ Object
Constructor Details
#initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false, bulk_import_id_column: nil) ⇒ MassUpdater
Returns a new instance of MassUpdater.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 22 def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false, bulk_import_id_column: nil) @klass = klass @replace_associations = replace_associations @bulk_import_id_generator = bulk_import_id_generator @save_associations_first = save_associations_first @bulk_import_id_column = bulk_import_id_column&.to_s @key_cols = {} @key_col_proc = key_col_proc @columns = {} @col_proc = col_proc end |
Instance Method Details
#assign_associations(record_list) ⇒ Hash
Assign associated records to corresponding primary records
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 93 def assign_associations(record_list) associations_info = {} record_list.associations.each do |assoc| col = @bulk_import_id_column if assoc.klass.column_names.include?(@bulk_import_id_column) associations_info[[assoc, col]] = [] end record_list.batch_records.each do |primary_batch_record| associations_info.each_key do |assoc, col| batch_record = BatchRecord.new(klass: assoc.klass, attributes: primary_batch_record.associations[assoc.name], bulk_import_column: col, bulk_import_id_generator: @bulk_import_id_generator) # Associate this associated batch record's record with the primary record to # retrieve foreign_key after associated records have been saved and primary # keys have been filled primary_batch_record.record.assign_attributes({ assoc.name => batch_record.record }) associations_info[[assoc, col]] << batch_record end end associations_info end |
#columns(klass) ⇒ Object
39 40 41 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 39 def columns(klass) @columns[klass] ||= @col_proc&.call(klass) || self.default_cols(klass) end |
#default_cols(klass) ⇒ Object
14 15 16 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 14 def default_cols(klass) klass.column_names - %w(created_at updated_at) end |
#default_keys(klass) ⇒ Object
9 10 11 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 9 def default_keys(klass) [klass.primary_key] end |
#import_associations(record_list) ⇒ Object
Imports associated objects and import them to database table The base table is expected to contain bulk_import_id column for indexing associated objects with id
75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 75 def import_associations(record_list) record_list.fill_primary_keys! import_id = @replace_associations ? @bulk_import_id_generator&.call : nil record_list.associations.each do |assoc| sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten next unless sub_records.any? sub_record_list = BatchRecordList.new(sub_records) save_records_to_database(sub_record_list) record_list.delete_old_records(assoc, import_id) if import_id end end |
#key_cols(klass) ⇒ Object
44 45 46 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 44 def key_cols(klass) @key_cols[klass] ||= @key_col_proc&.call(klass) || self.default_keys(klass) end |
#mass_update(record_list) ⇒ Array<ActiveRecord::Base>
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 135 def mass_update(record_list) # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock if @save_associations_first associations_info = assign_associations(record_list) save_associations_first(record_list, associations_info) Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do save_records_to_database(record_list) end else Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do save_records_to_database(record_list) import_associations(record_list) if record_list.associations.any? end end record_list.records end |
#save_associations_first(record_list, associations_info) ⇒ Object
Save associated records and fill foreign keys on RecordList records
118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 118 def save_associations_first(record_list, associations_info) associations_info.each_value do |records| assoc_record_list = BatchRecordList.new(records) Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do save_records_to_database(assoc_record_list) end import_associations(assoc_record_list) end record_list.records.each do |record| associations_info.each_key do |assoc, _| record.assign_attributes({ assoc.foreign_key => record.send(assoc.name).id }) end end end |
#save_records_to_database(record_list) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/deimos/active_record_consume/mass_updater.rb', line 49 def save_records_to_database(record_list) columns = self.columns(record_list.klass) key_cols = self.key_cols(record_list.klass) record_list.records.each(&:validate!) = if @key_cols.empty? {} # Can't upsert with no key, just do regular insert elsif ActiveRecord::Base.connection.adapter_name.downcase =~ /mysql/ || ActiveRecord::Base.connection.adapter_name.downcase =~ /trilogy/ { on_duplicate_key_update: columns } else { on_duplicate_key_update: { conflict_target: key_cols, columns: columns } } end record_list.klass.import!(columns, record_list.records, ) end |