Module: Deimos::ActiveRecordConsume::BatchConsumption
- Included in:
- Deimos::ActiveRecordConsumer
- Defined in:
- lib/deimos/active_record_consume/batch_consumption.rb
Overview
Methods for consuming batches of messages and saving them to the database in bulk ActiveRecord operations.
Instance Method Summary collapse
-
#consume_batch(payloads, metadata) ⇒ Object
Handle a batch of Kafka messages.
-
#record_key(key) ⇒ Hash
Get unique key for the ActiveRecord instance from the incoming key.
Instance Method Details
#consume_batch(payloads, metadata) ⇒ Object
Handle a batch of Kafka messages. Batches are split into “slices”, which are groups of independent messages that can be processed together in a single database operation. If two messages in a batch have the same key, we cannot process them in the same operation as they would interfere with each other. Thus they are split
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 20 def consume_batch(payloads, ) = payloads. zip([:keys]). map { |p, k| Deimos::Message.new(p, nil, key: k) } = %W(topic:#{[:topic]}) Deimos.instrument('ar_consumer.consume_batch', ) do # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock Deimos::Utils::DeadlockRetry.wrap() do if @compacted || self.class.config[:no_keys] update_database(()) else uncompacted_update() end end end end |
#record_key(key) ⇒ Hash
Get unique key for the ActiveRecord instance from the incoming key. Override this method (with super) to customize the set of attributes that uniquely identifies each record in the database.
46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 46 def record_key(key) decoded_key = decode_key(key) if decoded_key.nil? {} elsif decoded_key.is_a?(Hash) @key_converter.convert(decoded_key) else { @klass.primary_key => decoded_key } end end |