Module: Deimos::ActiveRecordConsume::BatchConsumption

Includes:
Consume::BatchConsumption
Included in:
Deimos::ActiveRecordConsumer
Defined in:
lib/deimos/active_record_consume/batch_consumption.rb,
sig/defs.rbs

Overview

Methods for consuming batches of messages and saving them to the database in bulk ActiveRecord operations.

Instance Method Summary collapse

Methods included from Consume::BatchConsumption

#around_consume_batch

Instance Method Details

#compact_messages(batch) ⇒ ::Array[Message]

Compact a batch of messages, taking only the last message for each unique key.

@param batch — Batch of messages.

@return — Compacted batch.

Parameters:

Returns:



121
122
123
124
125
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 121

def compact_messages(batch)
  return batch unless batch.first&.key.present?

  batch.reverse.uniq(&:key).reverse!
end

#consume_batchvoid

This method returns an undefined value.

Handle a batch of Kafka messages. Batches are split into "slices", which are groups of independent messages that can be processed together in a single database operation. If two messages in a batch have the same key, we cannot process them in the same operation as they would interfere with each other. Thus they are split

@param payloads — Decoded payloads

@param metadata — Information about batch, including keys.

Parameters:



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 27

def consume_batch
  filtered = messages.select { |message| process_message?(message) }
  skipped_count = messages.size - filtered.size
  if skipped_count.positive?
    Deimos::Logging.log_debug(
      message: 'Skipping processing of messages in batch',
      skipped_count: skipped_count
    )
  end
  deimos_messages = filtered.map { |p| Deimos::Message.new(p.payload, key: p.key) }

  tag = topic.name
  Deimos.config.tracer.active_span.set_tag('topic', tag)

  Karafka.monitor.instrument('deimos.ar_consumer.consume_batch', { topic: tag }) do
    if @compacted && deimos_messages.map(&:key).compact.any?
      update_database(compact_messages(deimos_messages))
    else
      uncompacted_update(deimos_messages)
    end
  end

  post_process_batch(deimos_messages)
end

#deleted_query(records) ⇒ ActiveRecord::Relation

Create an ActiveRecord relation that matches all of the passed records. Used for bulk deletion.

@param records — List of messages.

@return — Matching relation.

Parameters:

Returns:

  • (ActiveRecord::Relation)


92
93
94
95
96
97
98
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 92

def deleted_query(records)
  keys = records.
    map { |m| record_key(m.key)[@klass.primary_key] }.
    compact

  @klass.unscoped.where(@klass.primary_key => keys)
end

#key_columns(_klass) ⇒ ::Array[String]

Get the set of attribute names that uniquely identify messages in the batch. Requires at least one record.

@param records — Non-empty list of messages.

@return — List of attribute names.

Parameters:

Returns:

  • (::Array[String])


60
61
62
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 60

def key_columns(_klass)
  nil
end

#record_key(key) ⇒ ::Hash[untyped, untyped]

Get unique key for the ActiveRecord instance from the incoming key. Override this method (with super) to customize the set of attributes that uniquely identifies each record in the database.

@param key — The encoded key.

@return — The key attributes.

Parameters:

  • key (String)

Returns:

  • (::Hash[untyped, untyped])


76
77
78
79
80
81
82
83
84
85
86
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 76

def record_key(key)
  if key.nil?
    {}
  elsif key.is_a?(Hash) || key.is_a?(AvroGen::SchemaClass::Record)
    self.key_converter.convert(key)
  elsif self.topic.key_config[:field].nil?
    { @klass.primary_key => key }
  else
    { self.topic.key_config[:field].to_s => key }
  end
end

#remove_records(messages) ⇒ void

This method returns an undefined value.

Delete any records with a tombstone. deleted records.

@param messages — List of messages for a group of

Parameters:



241
242
243
244
245
246
247
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 241

def remove_records(messages)
  Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
    clause = deleted_query(messages)

    clause.delete_all
  end
end

#uncompacted_update(messages) ⇒ void

This method returns an undefined value.

Perform database operations for a batch of messages without compaction. All messages are split into slices containing only unique keys, and each slice is handles as its own batch.

@param messages — List of messages.

Parameters:



132
133
134
135
136
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 132

def uncompacted_update(messages)
  BatchSlicer.
    slice(messages).
    each(&method(:update_database))
end

#update_database(messages) ⇒ void

This method returns an undefined value.

Perform database operations for a group of messages. All messages with payloads are passed to upsert_records. All tombstones messages are passed to remove_records.

@param messages — List of messages.

Parameters:



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 143

def update_database(messages)
  # Find all upserted records (i.e. that have a payload) and all
  # deleted record (no payload)
  removed, upserted = messages.partition { |m| delete_record?(m) }

  max_db_batch_size = self.class.config[:max_db_batch_size]
  if upserted.any?
    if max_db_batch_size
      upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) }
    else
      upsert_records(upserted)
    end
  end

  return if removed.empty?

  if max_db_batch_size
    removed.each_slice(max_db_batch_size) { |group| remove_records(group) }
  else
    remove_records(removed)
  end
end

#upsert_records(messages) ⇒ void

This method returns an undefined value.

Upsert any non-deleted records records to either be updated or inserted.

@param messages — List of messages for a group of

Parameters:



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 170

def upsert_records(messages)
  record_list = build_records(messages)
  invalid = filter_records(record_list)
  if invalid.any?
    Karafka.monitor.instrument('deimos.batch_consumption.invalid_records', {
                                 records: invalid,
                                 consumer: self.class
                               })
  end
  return if record_list.empty?

  key_col_proc = self.method(:key_columns).to_proc
  col_proc = self.method(:columns).to_proc

  updater = MassUpdater.new(@klass,
                            key_col_proc: key_col_proc,
                            col_proc: col_proc,
                            replace_associations: self.replace_associations,
                            bulk_import_id_generator: self.bulk_import_id_generator,
                            save_associations_first: self.save_associations_first,
                            bulk_import_id_column: self.bulk_import_id_column)
  Karafka.monitor.instrument('deimos.batch_consumption.valid_records', {
                               records: updater.mass_update(record_list),
                               consumer: self.class
                             })
end