Class: Deimos::ActiveRecordConsumer

Inherits:
Consumer
  • Object
show all
Includes:
Deimos::ActiveRecordConsume::BatchConsumption, Deimos::ActiveRecordConsume::MessageConsumption
Defined in:
lib/deimos/active_record_consumer.rb

Overview

To configure batch vs. message mode, change the delivery mode of your Phobos listener. Message-by-message -> use ‘delivery: message` or `delivery: batch` Batch -> use `delivery: inline_batch`

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Deimos::ActiveRecordConsume::BatchConsumption

#consume_batch

Methods included from Consume::BatchConsumption

#consume_batch

Methods included from Deimos::ActiveRecordConsume::MessageConsumption

#assign_key, #consume_message, #destroy_record, #fetch_record, #save_record

Methods included from Consume::MessageConsumption

#consume_message

Methods inherited from Consumer

#consume

Constructor Details

#initializeActiveRecordConsumer

Setup



73
74
75
76
# File 'lib/deimos/active_record_consumer.rb', line 73

def initialize
  @klass = self.class.config[:record_class]
  @compacted = self.class.config[:compacted] != false
end

Class Method Details

.compacted(val) ⇒ void

This method returns an undefined value.

only the last message for each unique key in a batch is processed.

Parameters:

  • val (Boolean)

    Turn pre-compaction of the batch on or off. If true,



36
37
38
# File 'lib/deimos/active_record_consumer.rb', line 36

def compacted(val)
  config[:compacted] = val
end

.max_db_batch_size(limit) ⇒ void

This method returns an undefined value.

Parameters:

  • limit (Integer)

    Maximum number of transactions in a single database call.



42
43
44
# File 'lib/deimos/active_record_consumer.rb', line 42

def max_db_batch_size(limit)
  config[:max_db_batch_size] = limit
end

.record_class(klass) ⇒ void

This method returns an undefined value.

database.

Parameters:

  • klass (Class<ActiveRecord::Base>)

    the class used to save to the



29
30
31
# File 'lib/deimos/active_record_consumer.rb', line 29

def record_class(klass)
  config[:record_class] = klass
end

Instance Method Details

#bulk_import_id_columnString?

Returns:

  • (String, nil)


54
55
56
# File 'lib/deimos/active_record_consumer.rb', line 54

def bulk_import_id_column
  self.topic.bulk_import_id_column
end

#bulk_import_id_generatorProc

Returns:

  • (Proc)


59
60
61
# File 'lib/deimos/active_record_consumer.rb', line 59

def bulk_import_id_generator
  topic.bulk_import_id_generator
end

#converterObject



78
79
80
81
# File 'lib/deimos/active_record_consumer.rb', line 78

def converter
  decoder = self.topic.deserializers[:payload].backend
  @converter ||= ActiveRecordConsume::SchemaModelConverter.new(decoder, @klass)
end

#key_converterObject



83
84
85
86
87
# File 'lib/deimos/active_record_consumer.rb', line 83

def key_converter
  decoder = self.topic.deserializers[:key]&.backend
  return nil if decoder.nil?
  @key_converter ||= ActiveRecordConsume::SchemaModelConverter.new(decoder, @klass)
end

#key_decoderObject



68
69
70
# File 'lib/deimos/active_record_consumer.rb', line 68

def key_decoder
  self.topic.serializers[:key]&.backend
end

#process_message?(_payload) ⇒ Boolean

Override this message to conditionally save records

Parameters:

Returns:

  • (Boolean)

    if true, record is created/update. If false, record processing is skipped but message offset is still committed.



102
103
104
# File 'lib/deimos/active_record_consumer.rb', line 102

def process_message?(_payload)
  true
end

#record_attributes(payload, _key = nil) ⇒ Hash

Override this method (with ‘super`) if you want to add/change the default attributes set to the new/existing record.

Parameters:

Returns:

  • (Hash)


94
95
96
# File 'lib/deimos/active_record_consumer.rb', line 94

def record_attributes(payload, _key=nil)
  self.converter.convert(payload)
end

#replace_associationsBoolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/deimos/active_record_consumer.rb', line 49

def replace_associations
  self.topic.replace_associations
end

#save_associations_firstBoolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/deimos/active_record_consumer.rb', line 64

def save_associations_first
  topic.save_associations_first
end