Module: Deimos::ActiveRecordConsume::MessageConsumption

Included in:
Deimos::ActiveRecordConsumer
Defined in:
lib/deimos/active_record_consume/message_consumption.rb

Overview

Methods for consuming individual messages and saving them to the database as ActiveRecord instances.

Instance Method Summary collapse

Instance Method Details

#assign_key(record, _payload, key) ⇒ Object

Assign a key to a new record.

Parameters:



24
25
26
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 24

def assign_key(record, _payload, key)
  record[record.class.primary_key] = key
end

#consume(payload, metadata) ⇒ Object

Parameters:



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 30

def consume(payload, )
  unless self.process_message?(payload)
    Deimos.config.logger.debug(
      message: 'Skipping processing of message',
      payload: payload,
      metadata: 
    )
    return
  end

  key = .with_indifferent_access[:key]
  klass = self.class.config[:record_class]
  record = fetch_record(klass, (payload || {}).with_indifferent_access, key)
  if payload.nil?
    destroy_record(record)
    return
  end
  if record.blank?
    record = klass.new
    assign_key(record, payload, key)
  end

  # for backwards compatibility
  # TODO next major release we should deprecate this
  attrs = if self.method(:record_attributes).parameters.size == 2
            record_attributes(payload.with_indifferent_access, key)
          else
            record_attributes(payload.with_indifferent_access)
          end
  # don't use attributes= - bypass Rails < 5 attr_protected
  attrs.each do |k, v|
    record.send("#{k}=", v)
  end
  save_record(record)
end

#destroy_record(record) ⇒ Object

Destroy a record that received a null payload. Override if you need to do something other than a straight destroy (e.g. mark as archived).

Parameters:

  • record (ActiveRecord::Base)


76
77
78
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 76

def destroy_record(record)
  record&.destroy
end

#fetch_record(klass, _payload, key) ⇒ ActiveRecord::Base

Find the record specified by the given payload and key. Default is to use the primary key column and the value of the first field in the key.

Parameters:

Returns:

  • (ActiveRecord::Base)


15
16
17
18
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 15

def fetch_record(klass, _payload, key)
  fetch_key = key.is_a?(Hash) && key.size == 1 ? key.values.first : key
  klass.unscoped.where(klass.primary_key => fetch_key).first
end

#save_record(record) ⇒ Object

Parameters:

  • record (ActiveRecord::Base)


67
68
69
70
71
# File 'lib/deimos/active_record_consume/message_consumption.rb', line 67

def save_record(record)
  record.created_at ||= Time.zone.now if record.respond_to?(:created_at)
  record.updated_at = Time.zone.now if record.respond_to?(:updated_at)
  record.save!
end