Module: Deimos::ActiveRecordConsume::MessageConsumption

Includes:
Consume::MessageConsumption
Included in:
Deimos::ActiveRecordConsumer
Defined in:
lib/deimos/active_record_consume/message_consumption.rb

Overview

Methods for consuming individual messages and saving them to the database as ActiveRecord instances.

Instance Method Summary collapse

Instance Method Details

#assign_key(record, _payload, key) ⇒ void

This method returns an undefined value.

Assign a key to a new record.

Parameters:



28
29
30
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 28

def assign_key(record, _payload, key)
  record[record.class.primary_key] = key
end

#consume_message(message) ⇒ Object

Parameters:

  • message (Karafka::Messages::Message)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 33

def consume_message(message)
  unless self.process_message?(message)
    Deimos::Logging.log_debug(
        message: 'Skipping processing of message',
        payload: message.payload.to_h,
        metadata: Deimos::Logging.(message.)
      )
    return
  end

  klass = self.class.config[:record_class]
  record = fetch_record(klass, message.payload.to_h.with_indifferent_access, message.key)
  if message.payload.nil?
    destroy_record(record)
    return
  end
  if record.blank?
    record = klass.new
    assign_key(record, message.payload, message.key)
  end

  attrs = record_attributes((message.payload || {}).with_indifferent_access, message.key)
  # don't use attributes= - bypass Rails < 5 attr_protected
  attrs.each do |k, v|
    record.send("#{k}=", v)
  end
  save_record(record)
end

#destroy_record(record) ⇒ void

This method returns an undefined value.

Destroy a record that received a null payload. Override if you need to do something other than a straight destroy (e.g. mark as archived).

Parameters:

  • record (ActiveRecord::Base)


74
75
76
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 74

def destroy_record(record)
  record&.destroy
end

#fetch_record(klass, _payload, key) ⇒ ActiveRecord::Base

Find the record specified by the given payload and key. Default is to use the primary key column and the value of the first field in the key.

Parameters:

Returns:

  • (ActiveRecord::Base)


18
19
20
21
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 18

def fetch_record(klass, _payload, key)
  fetch_key = key.is_a?(Hash) && key.size == 1 ? key.values.first : key
  klass.unscoped.where(klass.primary_key => fetch_key).first
end

#save_record(record) ⇒ void

This method returns an undefined value.

Parameters:

  • record (ActiveRecord::Base)


64
65
66
67
68
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 64

def save_record(record)
  record.created_at ||= Time.zone.now if record.respond_to?(:created_at)
  record.updated_at = Time.zone.now if record.respond_to?(:updated_at)
  record.save!
end