Module: Deimos::ActiveRecordConsume::BatchConsumption

Included in:
Deimos::ActiveRecordConsumer
Defined in:
lib/deimos/active_record_consume/batch_consumption.rb

Overview

Methods for consuming batches of messages and saving them to the database in bulk ActiveRecord operations.

Instance Method Summary collapse

Instance Method Details

#consume_batch(payloads, metadata) ⇒ void

This method returns an undefined value.

Handle a batch of Kafka messages. Batches are split into “slices”, which are groups of independent messages that can be processed together in a single database operation. If two messages in a batch have the same key, we cannot process them in the same operation as they would interfere with each other. Thus they are split

Parameters:



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 26

def consume_batch(payloads, )
  messages = payloads.
    zip([:keys]).
    map { |p, k| Deimos::Message.new(p, nil, key: k) }

  tags = %W(topic:#{[:topic]})

  Deimos.instrument('ar_consumer.consume_batch', tags) do
    # The entire batch should be treated as one transaction so that if
    # any message fails, the whole thing is rolled back or retried
    # if there is deadlock
    Deimos::Utils::DeadlockRetry.wrap(tags) do
      if @compacted || self.class.config[:no_keys]
        update_database(compact_messages(messages))
      else
        uncompacted_update(messages)
      end
    end
  end
end