Class: Deimos::ActiveRecordConsumer

Inherits:
Consumer
  • Object
show all
Includes:
Deimos::ActiveRecordConsume::BatchConsumption, Deimos::ActiveRecordConsume::MessageConsumption
Defined in:
lib/deimos/active_record_consumer.rb,
sig/defs.rbs

Overview

To configure batch vs. message mode, change the delivery mode of your Phobos listener. Message-by-message -> use delivery: message or delivery: batch Batch -> use delivery: inline_batch

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Consume::BatchConsumption

#around_consume_batch

Methods included from Deimos::ActiveRecordConsume::MessageConsumption

#consume_message

Methods included from Consume::MessageConsumption

#around_consume, #consume_message

Methods inherited from Consumer

#around_consume, #around_consume_batch, #decode_key, #decode_message, decoder, key_decoder

Constructor Details

#initializeActiveRecordConsumer

Setup



73
74
75
76
77
# File 'lib/deimos/active_record_consumer.rb', line 73

def initialize
  @klass = self.class.config[:record_class]
  @klass = @klass.constantize if @klass.is_a?(String)
  @compacted = self.class.config[:compacted] != false
end

Class Method Details

.compacted(val) ⇒ void

This method returns an undefined value.

only the last message for each unique key in a batch is processed.

@param val — Turn pre-compaction of the batch on or off. If true,

Parameters:

  • val (Boolean)


36
37
38
# File 'lib/deimos/active_record_consumer.rb', line 36

def compacted(val)
  config[:compacted] = val
end

.max_db_batch_size(limit) ⇒ void

This method returns an undefined value.

Parameters:

  • limit (Integer)

    Maximum number of transactions in a single database call.



42
43
44
# File 'lib/deimos/active_record_consumer.rb', line 42

def max_db_batch_size(limit)
  config[:max_db_batch_size] = limit
end

.record_class(klass) ⇒ void

This method returns an undefined value.

database.

@param klass — the class used to save to the

Parameters:

  • klass (singleton(ActiveRecord::Base))


29
30
31
# File 'lib/deimos/active_record_consumer.rb', line 29

def record_class(klass)
  config[:record_class] = klass
end

Instance Method Details

#assign_keyvoid

This method returns an undefined value.

Assign a key to a new record.

@param record

@param _payload

@param key

Parameters:



1584
# File 'sig/defs.rbs', line 1584

def assign_key: (ActiveRecord::Base record, (::Hash[untyped, untyped] | Deimos::SchemaClass::Record) _payload, Object key) -> void

#bulk_import_id_columnString?

Returns:

  • (String, nil)


54
55
56
# File 'lib/deimos/active_record_consumer.rb', line 54

def bulk_import_id_column
  self.topic.bulk_import_id_column
end

#bulk_import_id_generatorProc

Returns:

  • (Proc)


59
60
61
# File 'lib/deimos/active_record_consumer.rb', line 59

def bulk_import_id_generator
  topic.bulk_import_id_generator
end

#compact_messages::Array[Message]

Compact a batch of messages, taking only the last message for each unique key.

@param batch — Batch of messages.

@return — Compacted batch.

Parameters:

Returns:



1564
# File 'sig/defs.rbs', line 1564

def compact_messages: (::Array[Message] batch) -> ::Array[Message]

#consumevoid

This method returns an undefined value.

@param payload — Decoded payloads

@param metadata — Information about batch, including keys.

Parameters:



1589
# File 'sig/defs.rbs', line 1589

def consume: ((::Hash[untyped, untyped] | Deimos::SchemaClass::Record) payload, ::Hash[untyped, untyped] metadata) -> void

#consume_batchvoid

This method returns an undefined value.

Handle a batch of Kafka messages. Batches are split into "slices", which are groups of independent messages that can be processed together in a single database operation. If two messages in a batch have the same key, we cannot process them in the same operation as they would interfere with each other. Thus they are split

@param payloads — Decoded payloads

@param metadata — Information about batch, including keys.

Parameters:



1505
# File 'sig/defs.rbs', line 1505

def consume_batch: (::Array[(::Hash[untyped, untyped] | Deimos::SchemaClass::Record)] payloads, ::Hash[untyped, untyped] metadata) -> void

#converterObject



79
80
81
82
# File 'lib/deimos/active_record_consumer.rb', line 79

def converter
  decoder = self.topic.deserializers[:payload].backend
  @converter ||= ActiveRecordConsume::SchemaModelConverter.new(decoder, @klass)
end

#delete_record?(message) ⇒ Boolean

Indicates whether to delete the given message. Defaults to checking if the message payload is nil.

Parameters:

Returns:

  • (Boolean)


104
105
106
# File 'lib/deimos/active_record_consumer.rb', line 104

def delete_record?(message)
  message.payload.nil?
end

#deleted_queryActiveRecord::Relation

Create an ActiveRecord relation that matches all of the passed records. Used for bulk deletion.

@param records — List of messages.

@return — Matching relation.

Parameters:

Returns:

  • (ActiveRecord::Relation)


1548
# File 'sig/defs.rbs', line 1548

def deleted_query: (::Array[Message] records) -> ActiveRecord::Relation

#destroy_recordvoid

This method returns an undefined value.

Destroy a record that received a null payload. Override if you need to do something other than a straight destroy (e.g. mark as archived).

@param record

Parameters:

  • record (ActiveRecord::Base)


1598
# File 'sig/defs.rbs', line 1598

def destroy_record: (ActiveRecord::Base record) -> void

#fetch_recordActiveRecord::Base

Find the record specified by the given payload and key. Default is to use the primary key column and the value of the first field in the key.

@param klass

@param _payload

@param key

Parameters:

Returns:

  • (ActiveRecord::Base)


1575
# File 'sig/defs.rbs', line 1575

def fetch_record: (singleton(ActiveRecord::Base) klass, (::Hash[untyped, untyped] | Deimos::SchemaClass::Record) _payload, Object key) -> ActiveRecord::Base

#key_columns::Array[String]

Get the set of attribute names that uniquely identify messages in the batch. Requires at least one record.

@param records — Non-empty list of messages.

@return — List of attribute names.

Parameters:

Returns:

  • (::Array[String])


1556
# File 'sig/defs.rbs', line 1556

def key_columns: (::Array[Message] records) -> ::Array[String]

#key_converterObject



84
85
86
87
88
89
# File 'lib/deimos/active_record_consumer.rb', line 84

def key_converter
  decoder = self.topic.deserializers[:key]&.backend
  return nil if decoder.nil?

  @key_converter ||= ActiveRecordConsume::SchemaModelConverter.new(decoder, @klass)
end

#key_decoderObject



68
69
70
# File 'lib/deimos/active_record_consumer.rb', line 68

def key_decoder
  self.topic.serializers[:key]&.backend
end

#process_message?(_message) ⇒ Boolean

Override this message to conditionally save records

@param _payload — The kafka message

@return — if true, record is created/update. If false, record processing is skipped but message offset is still committed.

Parameters:

Returns:

  • (Boolean)


115
116
117
# File 'lib/deimos/active_record_consumer.rb', line 115

def process_message?(_message)
  true
end

#record_attributes(payload, _key = nil) ⇒ ::Hash[untyped, untyped]

Override this method (with super) if you want to add/change the default attributes set to the new/existing record.

@param payload

@param _key

Parameters:

Returns:

  • (::Hash[untyped, untyped])


96
97
98
# File 'lib/deimos/active_record_consumer.rb', line 96

def record_attributes(payload, _key=nil)
  self.converter.convert(payload)
end

#record_key::Hash[untyped, untyped]

Get unique key for the ActiveRecord instance from the incoming key. Override this method (with super) to customize the set of attributes that uniquely identifies each record in the database.

@param key — The encoded key.

@return — The key attributes.

Parameters:

  • key (String)

Returns:

  • (::Hash[untyped, untyped])


1514
# File 'sig/defs.rbs', line 1514

def record_key: (String key) -> ::Hash[untyped, untyped]

#remove_recordsvoid

This method returns an undefined value.

Delete any records with a tombstone. deleted records.

@param messages — List of messages for a group of

Parameters:



1540
# File 'sig/defs.rbs', line 1540

def remove_records: (::Array[Message] messages) -> void

#replace_associationsBoolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/deimos/active_record_consumer.rb', line 49

def replace_associations
  self.topic.replace_associations
end

#save_associations_firstBoolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/deimos/active_record_consumer.rb', line 64

def save_associations_first
  topic.save_associations_first
end

#save_recordvoid

This method returns an undefined value.

@param record

Parameters:

  • record (ActiveRecord::Base)


1592
# File 'sig/defs.rbs', line 1592

def save_record: (ActiveRecord::Base record) -> void

#uncompacted_updatevoid

This method returns an undefined value.

Perform database operations for a batch of messages without compaction. All messages are split into slices containing only unique keys, and each slice is handles as its own batch.

@param messages — List of messages.

Parameters:



1521
# File 'sig/defs.rbs', line 1521

def uncompacted_update: (::Array[Message] messages) -> void

#update_databasevoid

This method returns an undefined value.

Perform database operations for a group of messages. All messages with payloads are passed to upsert_records. All tombstones messages are passed to remove_records.

@param messages — List of messages.

Parameters:



1528
# File 'sig/defs.rbs', line 1528

def update_database: (::Array[Message] messages) -> void

#upsert_recordsvoid

This method returns an undefined value.

Upsert any non-deleted records records to either be updated or inserted.

@param messages — List of messages for a group of

Parameters:



1534
# File 'sig/defs.rbs', line 1534

def upsert_records: (::Array[Message] messages) -> void