Class: Deimos::ActiveRecordConsumer
- Includes:
- Deimos::ActiveRecordConsume::BatchConsumption, Deimos::ActiveRecordConsume::MessageConsumption
- Defined in:
- lib/deimos/active_record_consumer.rb,
sig/defs.rbs
Overview
To configure batch vs. message mode, change the delivery mode of your
Phobos listener.
Message-by-message -> use delivery: message or delivery: batch
Batch -> use delivery: inline_batch
Class Method Summary collapse
-
.compacted(val) ⇒ void
only the last message for each unique key in a batch is processed.
- .max_db_batch_size(limit) ⇒ void
-
.record_class(klass) ⇒ void
database.
Instance Method Summary collapse
-
#assign_key ⇒ void
Assign a key to a new record.
- #bulk_import_id_column ⇒ String?
- #bulk_import_id_generator ⇒ Proc
-
#compact_messages ⇒ ::Array[Message]
Compact a batch of messages, taking only the last message for each unique key.
-
#consume ⇒ void
@param
payload— Decoded payloads. -
#consume_batch ⇒ void
Handle a batch of Kafka messages.
- #converter ⇒ Object
-
#delete_record?(message) ⇒ Boolean
Indicates whether to delete the given message.
-
#deleted_query ⇒ ActiveRecord::Relation
Create an ActiveRecord relation that matches all of the passed records.
-
#destroy_record ⇒ void
Destroy a record that received a null payload.
-
#fetch_record ⇒ ActiveRecord::Base
Find the record specified by the given payload and key.
-
#initialize ⇒ ActiveRecordConsumer
constructor
Setup.
-
#key_columns ⇒ ::Array[String]
Get the set of attribute names that uniquely identify messages in the batch.
- #key_converter ⇒ Object
- #key_decoder ⇒ Object
-
#process_message?(_message) ⇒ Boolean
Override this message to conditionally save records.
-
#record_attributes(payload, _key = nil) ⇒ ::Hash[untyped, untyped]
Override this method (with
super) if you want to add/change the default attributes set to the new/existing record. -
#record_key ⇒ ::Hash[untyped, untyped]
Get unique key for the ActiveRecord instance from the incoming key.
-
#remove_records ⇒ void
Delete any records with a tombstone.
- #replace_associations ⇒ Boolean
- #save_associations_first ⇒ Boolean
-
#save_record ⇒ void
@param
record. -
#uncompacted_update ⇒ void
Perform database operations for a batch of messages without compaction.
-
#update_database ⇒ void
Perform database operations for a group of messages.
-
#upsert_records ⇒ void
Upsert any non-deleted records records to either be updated or inserted.
Methods included from Consume::BatchConsumption
Methods included from Deimos::ActiveRecordConsume::MessageConsumption
Methods included from Consume::MessageConsumption
#around_consume, #consume_message
Methods inherited from Consumer
#around_consume, #around_consume_batch, #decode_key, #decode_message, decoder, key_decoder
Constructor Details
#initialize ⇒ ActiveRecordConsumer
Setup
73 74 75 76 77 |
# File 'lib/deimos/active_record_consumer.rb', line 73 def initialize @klass = self.class.config[:record_class] @klass = @klass.constantize if @klass.is_a?(String) @compacted = self.class.config[:compacted] != false end |
Class Method Details
.compacted(val) ⇒ void
This method returns an undefined value.
only the last message for each unique key in a batch is processed.
@param val — Turn pre-compaction of the batch on or off. If true,
36 37 38 |
# File 'lib/deimos/active_record_consumer.rb', line 36 def compacted(val) config[:compacted] = val end |
.max_db_batch_size(limit) ⇒ void
This method returns an undefined value.
42 43 44 |
# File 'lib/deimos/active_record_consumer.rb', line 42 def max_db_batch_size(limit) config[:max_db_batch_size] = limit end |
.record_class(klass) ⇒ void
This method returns an undefined value.
database.
@param klass — the class used to save to the
29 30 31 |
# File 'lib/deimos/active_record_consumer.rb', line 29 def record_class(klass) config[:record_class] = klass end |
Instance Method Details
#assign_key ⇒ void
This method returns an undefined value.
Assign a key to a new record.
@param record
@param _payload
@param key
1584 |
# File 'sig/defs.rbs', line 1584
def assign_key: (ActiveRecord::Base record, (::Hash[untyped, untyped] | Deimos::SchemaClass::Record) _payload, Object key) -> void
|
#bulk_import_id_column ⇒ String?
54 55 56 |
# File 'lib/deimos/active_record_consumer.rb', line 54 def bulk_import_id_column self.topic.bulk_import_id_column end |
#bulk_import_id_generator ⇒ Proc
59 60 61 |
# File 'lib/deimos/active_record_consumer.rb', line 59 def bulk_import_id_generator topic.bulk_import_id_generator end |
#compact_messages ⇒ ::Array[Message]
Compact a batch of messages, taking only the last message for each unique key.
@param batch — Batch of messages.
@return — Compacted batch.
1564 |
# File 'sig/defs.rbs', line 1564
def compact_messages: (::Array[Message] batch) -> ::Array[Message]
|
#consume ⇒ void
This method returns an undefined value.
@param payload — Decoded payloads
@param metadata — Information about batch, including keys.
1589 |
# File 'sig/defs.rbs', line 1589
def consume: ((::Hash[untyped, untyped] | Deimos::SchemaClass::Record) payload, ::Hash[untyped, untyped] metadata) -> void
|
#consume_batch ⇒ void
This method returns an undefined value.
Handle a batch of Kafka messages. Batches are split into "slices", which are groups of independent messages that can be processed together in a single database operation. If two messages in a batch have the same key, we cannot process them in the same operation as they would interfere with each other. Thus they are split
@param payloads — Decoded payloads
@param metadata — Information about batch, including keys.
1505 |
# File 'sig/defs.rbs', line 1505
def consume_batch: (::Array[(::Hash[untyped, untyped] | Deimos::SchemaClass::Record)] payloads, ::Hash[untyped, untyped] metadata) -> void
|
#converter ⇒ Object
79 80 81 82 |
# File 'lib/deimos/active_record_consumer.rb', line 79 def converter decoder = self.topic.deserializers[:payload].backend @converter ||= ActiveRecordConsume::SchemaModelConverter.new(decoder, @klass) end |
#delete_record?(message) ⇒ Boolean
Indicates whether to delete the given message. Defaults to checking if the message payload is nil.
104 105 106 |
# File 'lib/deimos/active_record_consumer.rb', line 104 def delete_record?() .payload.nil? end |
#deleted_query ⇒ ActiveRecord::Relation
Create an ActiveRecord relation that matches all of the passed records. Used for bulk deletion.
@param records — List of messages.
@return — Matching relation.
1548 |
# File 'sig/defs.rbs', line 1548
def deleted_query: (::Array[Message] records) -> ActiveRecord::Relation
|
#destroy_record ⇒ void
This method returns an undefined value.
Destroy a record that received a null payload. Override if you need to do something other than a straight destroy (e.g. mark as archived).
@param record
1598 |
# File 'sig/defs.rbs', line 1598
def destroy_record: (ActiveRecord::Base record) -> void
|
#fetch_record ⇒ ActiveRecord::Base
Find the record specified by the given payload and key. Default is to use the primary key column and the value of the first field in the key.
@param klass
@param _payload
@param key
1575 |
# File 'sig/defs.rbs', line 1575
def fetch_record: (singleton(ActiveRecord::Base) klass, (::Hash[untyped, untyped] | Deimos::SchemaClass::Record) _payload, Object key) -> ActiveRecord::Base
|
#key_columns ⇒ ::Array[String]
Get the set of attribute names that uniquely identify messages in the batch. Requires at least one record.
@param records — Non-empty list of messages.
@return — List of attribute names.
1556 |
# File 'sig/defs.rbs', line 1556
def key_columns: (::Array[Message] records) -> ::Array[String]
|
#key_converter ⇒ Object
84 85 86 87 88 89 |
# File 'lib/deimos/active_record_consumer.rb', line 84 def key_converter decoder = self.topic.deserializers[:key]&.backend return nil if decoder.nil? @key_converter ||= ActiveRecordConsume::SchemaModelConverter.new(decoder, @klass) end |
#key_decoder ⇒ Object
68 69 70 |
# File 'lib/deimos/active_record_consumer.rb', line 68 def key_decoder self.topic.serializers[:key]&.backend end |
#process_message?(_message) ⇒ Boolean
Override this message to conditionally save records
@param _payload — The kafka message
@return — if true, record is created/update. If false, record processing is skipped but message offset is still committed.
115 116 117 |
# File 'lib/deimos/active_record_consumer.rb', line 115 def () true end |
#record_attributes(payload, _key = nil) ⇒ ::Hash[untyped, untyped]
Override this method (with super) if you want to add/change the default
attributes set to the new/existing record.
@param payload
@param _key
96 97 98 |
# File 'lib/deimos/active_record_consumer.rb', line 96 def record_attributes(payload, _key=nil) self.converter.convert(payload) end |
#record_key ⇒ ::Hash[untyped, untyped]
Get unique key for the ActiveRecord instance from the incoming key. Override this method (with super) to customize the set of attributes that uniquely identifies each record in the database.
@param key — The encoded key.
@return — The key attributes.
1514 |
# File 'sig/defs.rbs', line 1514
def record_key: (String key) -> ::Hash[untyped, untyped]
|
#remove_records ⇒ void
This method returns an undefined value.
Delete any records with a tombstone. deleted records.
@param messages — List of messages for a group of
1540 |
# File 'sig/defs.rbs', line 1540
def remove_records: (::Array[Message] messages) -> void
|
#replace_associations ⇒ Boolean
49 50 51 |
# File 'lib/deimos/active_record_consumer.rb', line 49 def replace_associations self.topic.replace_associations end |
#save_associations_first ⇒ Boolean
64 65 66 |
# File 'lib/deimos/active_record_consumer.rb', line 64 def save_associations_first topic.save_associations_first end |
#save_record ⇒ void
This method returns an undefined value.
@param record
1592 |
# File 'sig/defs.rbs', line 1592
def save_record: (ActiveRecord::Base record) -> void
|
#uncompacted_update ⇒ void
This method returns an undefined value.
Perform database operations for a batch of messages without compaction. All messages are split into slices containing only unique keys, and each slice is handles as its own batch.
@param messages — List of messages.
1521 |
# File 'sig/defs.rbs', line 1521
def uncompacted_update: (::Array[Message] messages) -> void
|
#update_database ⇒ void
This method returns an undefined value.
Perform database operations for a group of messages. All messages with payloads are passed to upsert_records. All tombstones messages are passed to remove_records.
@param messages — List of messages.
1528 |
# File 'sig/defs.rbs', line 1528
def update_database: (::Array[Message] messages) -> void
|
#upsert_records ⇒ void
This method returns an undefined value.
Upsert any non-deleted records records to either be updated or inserted.
@param messages — List of messages for a group of
1534 |
# File 'sig/defs.rbs', line 1534
def upsert_records: (::Array[Message] messages) -> void
|