Module: Deimos::ActiveRecordConsume::BatchConsumption
- Includes:
- Consume::BatchConsumption
- Included in:
- Deimos::ActiveRecordConsumer
- Defined in:
- lib/deimos/active_record_consume/batch_consumption.rb,
sig/defs.rbs
Overview
Methods for consuming batches of messages and saving them to the database in bulk ActiveRecord operations.
Instance Method Summary collapse
-
#compact_messages(batch) ⇒ ::Array[Message]
Compact a batch of messages, taking only the last message for each unique key.
-
#consume_batch ⇒ void
Handle a batch of Kafka messages.
-
#deleted_query(records) ⇒ ActiveRecord::Relation
Create an ActiveRecord relation that matches all of the passed records.
-
#key_columns(_klass) ⇒ ::Array[String]
Get the set of attribute names that uniquely identify messages in the batch.
-
#record_key(key) ⇒ ::Hash[untyped, untyped]
Get unique key for the ActiveRecord instance from the incoming key.
-
#remove_records(messages) ⇒ void
Delete any records with a tombstone.
-
#uncompacted_update(messages) ⇒ void
Perform database operations for a batch of messages without compaction.
-
#update_database(messages) ⇒ void
Perform database operations for a group of messages.
-
#upsert_records(messages) ⇒ void
Upsert any non-deleted records records to either be updated or inserted.
Methods included from Consume::BatchConsumption
Instance Method Details
#compact_messages(batch) ⇒ ::Array[Message]
Compact a batch of messages, taking only the last message for each unique key.
@param batch — Batch of messages.
@return — Compacted batch.
121 122 123 124 125 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 121 def (batch) return batch unless batch.first&.key.present? batch.reverse.uniq(&:key).reverse! end |
#consume_batch ⇒ void
This method returns an undefined value.
Handle a batch of Kafka messages. Batches are split into "slices", which are groups of independent messages that can be processed together in a single database operation. If two messages in a batch have the same key, we cannot process them in the same operation as they would interfere with each other. Thus they are split
@param payloads — Decoded payloads
@param metadata — Information about batch, including keys.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 27 def consume_batch filtered = .select { || () } skipped_count = .size - filtered.size if skipped_count.positive? Deimos::Logging.log_debug( message: 'Skipping processing of messages in batch', skipped_count: skipped_count ) end = filtered.map { |p| Deimos::Message.new(p.payload, key: p.key) } tag = topic.name Deimos.config.tracer.active_span.set_tag('topic', tag) Karafka.monitor.instrument('deimos.ar_consumer.consume_batch', { topic: tag }) do if @compacted && .map(&:key).compact.any? update_database(()) else uncompacted_update() end end post_process_batch() end |
#deleted_query(records) ⇒ ActiveRecord::Relation
Create an ActiveRecord relation that matches all of the passed records. Used for bulk deletion.
@param records — List of messages.
@return — Matching relation.
92 93 94 95 96 97 98 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 92 def deleted_query(records) keys = records. map { |m| record_key(m.key)[@klass.primary_key] }. compact @klass.unscoped.where(@klass.primary_key => keys) end |
#key_columns(_klass) ⇒ ::Array[String]
Get the set of attribute names that uniquely identify messages in the batch. Requires at least one record.
@param records — Non-empty list of messages.
@return — List of attribute names.
60 61 62 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 60 def key_columns(_klass) nil end |
#record_key(key) ⇒ ::Hash[untyped, untyped]
Get unique key for the ActiveRecord instance from the incoming key. Override this method (with super) to customize the set of attributes that uniquely identifies each record in the database.
@param key — The encoded key.
@return — The key attributes.
76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 76 def record_key(key) if key.nil? {} elsif key.is_a?(Hash) || key.is_a?(AvroGen::SchemaClass::Record) self.key_converter.convert(key) elsif self.topic.key_config[:field].nil? { @klass.primary_key => key } else { self.topic.key_config[:field].to_s => key } end end |
#remove_records(messages) ⇒ void
This method returns an undefined value.
Delete any records with a tombstone. deleted records.
@param messages — List of messages for a group of
241 242 243 244 245 246 247 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 241 def remove_records() Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do clause = deleted_query() clause.delete_all end end |
#uncompacted_update(messages) ⇒ void
This method returns an undefined value.
Perform database operations for a batch of messages without compaction. All messages are split into slices containing only unique keys, and each slice is handles as its own batch.
@param messages — List of messages.
132 133 134 135 136 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 132 def uncompacted_update() BatchSlicer. slice(). each(&method(:update_database)) end |
#update_database(messages) ⇒ void
This method returns an undefined value.
Perform database operations for a group of messages. All messages with payloads are passed to upsert_records. All tombstones messages are passed to remove_records.
@param messages — List of messages.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 143 def update_database() # Find all upserted records (i.e. that have a payload) and all # deleted record (no payload) removed, upserted = .partition { |m| delete_record?(m) } max_db_batch_size = self.class.config[:max_db_batch_size] if upserted.any? if max_db_batch_size upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } else upsert_records(upserted) end end return if removed.empty? if max_db_batch_size removed.each_slice(max_db_batch_size) { |group| remove_records(group) } else remove_records(removed) end end |
#upsert_records(messages) ⇒ void
This method returns an undefined value.
Upsert any non-deleted records records to either be updated or inserted.
@param messages — List of messages for a group of
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 170 def upsert_records() record_list = build_records() invalid = filter_records(record_list) if invalid.any? Karafka.monitor.instrument('deimos.batch_consumption.invalid_records', { records: invalid, consumer: self.class }) end return if record_list.empty? key_col_proc = self.method(:key_columns).to_proc col_proc = self.method(:columns).to_proc updater = MassUpdater.new(@klass, key_col_proc: key_col_proc, col_proc: col_proc, replace_associations: self.replace_associations, bulk_import_id_generator: self.bulk_import_id_generator, save_associations_first: self.save_associations_first, bulk_import_id_column: self.bulk_import_id_column) Karafka.monitor.instrument('deimos.batch_consumption.valid_records', { records: updater.mass_update(record_list), consumer: self.class }) end |