Module: Deimos::ActiveRecordConsume::BatchConsumption

Includes:
Consume::BatchConsumption
Included in:
Deimos::ActiveRecordConsumer
Defined in:
lib/deimos/active_record_consume/batch_consumption.rb

Overview

Methods for consuming batches of messages and saving them to the database in bulk ActiveRecord operations.

Instance Method Summary collapse

Instance Method Details

#consume_batchvoid

This method returns an undefined value.

Handle a batch of Kafka messages. Batches are split into “slices”, which are groups of independent messages that can be processed together in a single database operation. If two messages in a batch have the same key, we cannot process them in the same operation as they would interfere with each other. Thus they are split



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 27

def consume_batch
  deimos_messages = messages.map { |p| Deimos::Message.new(p.payload, key: p.key) }

  tag = topic.name
  Deimos.config.tracer.active_span.set_tag('topic', tag)

  Karafka.monitor.instrument('deimos.ar_consumer.consume_batch', {topic: tag}) do
    if @compacted && deimos_messages.map(&:key).compact.any?
      update_database(compact_messages(deimos_messages))
    else
      uncompacted_update(deimos_messages)
    end
  end
end