Class: Deimos::ActiveRecordConsumer

Inherits:
Consumer
  • Object
show all
Includes:
Deimos::ActiveRecordConsume::BatchConsumption, Deimos::ActiveRecordConsume::MessageConsumption
Defined in:
lib/deimos/active_record_consumer.rb

Overview

To configure batch vs. message mode, change the delivery mode of your Phobos listener. Message-by-message -> use `delivery: message` or `delivery: batch` Batch -> use `delivery: inline_batch`

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Deimos::ActiveRecordConsume::BatchConsumption

#consume_batch, #record_key

Methods included from Deimos::ActiveRecordConsume::MessageConsumption

#assign_key, #consume, #destroy_record, #fetch_record, #save_record

Methods inherited from Consumer

#decode_key, #decode_message, decoder, key_decoder

Methods included from Consume::BatchConsumption

#around_consume_batch, #consume_batch

Methods included from Consume::MessageConsumption

#around_consume, #consume

Constructor Details

#initializeActiveRecordConsumer

Setup



40
41
42
43
44
45
46
47
48
49
# File 'lib/deimos/active_record_consumer.rb', line 40

def initialize
  @klass = self.class.config[:record_class]
  @converter = ActiveRecordConsume::SchemaModelConverter.new(self.class.decoder, @klass)

  if self.class.config[:key_schema]
    @key_converter = ActiveRecordConsume::SchemaModelConverter.new(self.class.key_decoder, @klass)
  end

  @compacted = self.class.config[:compacted] != false
end

Class Method Details

.compacted(val) ⇒ Object

param val [Boolean] Turn pre-compaction of the batch on or off. If true, only the last message for each unique key in a batch is processed.



34
35
36
# File 'lib/deimos/active_record_consumer.rb', line 34

def compacted(val)
  config[:compacted] = val
end

.record_class(klass) ⇒ Object

param klass [Class < ActiveRecord::Base] the class used to save to the database.



28
29
30
# File 'lib/deimos/active_record_consumer.rb', line 28

def record_class(klass)
  config[:record_class] = klass
end

Instance Method Details

#process_message?(_payload) ⇒ Boolean

Override this message to conditionally save records

Parameters:

Returns:

  • (Boolean)

    if true, record is created/update. If false, record processing is skipped but message offset is still committed.



63
64
65
# File 'lib/deimos/active_record_consumer.rb', line 63

def process_message?(_payload)
  true
end

#record_attributes(payload, _key = nil) ⇒ Object

Override this method (with `super`) if you want to add/change the default attributes set to the new/existing record.

Parameters:



55
56
57
# File 'lib/deimos/active_record_consumer.rb', line 55

def record_attributes(payload, _key=nil)
  @converter.convert(payload)
end