Class: Deimos::ActiveRecordProducer

Inherits:
Producer
  • Object
show all
Defined in:
lib/deimos/active_record_producer.rb

Overview

Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call ‘send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it’s published.

You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.

Constant Summary

Constants inherited from Producer

Producer::MAX_BATCH_SIZE

Class Method Summary collapse

Methods inherited from Producer

determine_backend_class, karafka_config, partition_key, produce, produce_batch, publish, publish_list, topic

Class Method Details

.configObject



55
56
57
# File 'lib/deimos/active_record_producer.rb', line 55

def config
  Deimos.karafka_configs.find { |t| t.producer_class == self }
end

.encoderObject



59
60
61
62
# File 'lib/deimos/active_record_producer.rb', line 59

def encoder
  raise "No schema or namespace configured for #{self.name}" if config.nil?
  config.deserializers[:payload].backend
end

.generate_payload(attributes, _record) ⇒ Hash

Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.

Parameters:

  • attributes (Hash)
  • _record (ActiveRecord::Base)

    May be nil if refetch_record

Returns:

  • (Hash)


70
71
72
73
74
75
76
77
78
79
# File 'lib/deimos/active_record_producer.rb', line 70

def generate_payload(attributes, _record)
  fields = self.encoder.schema_fields
  payload = attributes.stringify_keys
  payload.delete_if do |k, _|
    k.to_sym != :payload_key && !fields.map(&:name).include?(k)
  end
  return payload unless self.config.use_schema_classes

  Utils::SchemaClass.instance(payload, encoder.schema, encoder.namespace)
end

.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation

Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).

Parameters:

  • time_from (Time)

    the time to start the query from.

  • time_to (Time)

    the time to end the query.

  • column_name (Symbol) (defaults to: :updated_at)

    the column name to look for.

  • min_id (Numeric)

    the minimum ID (i.e. all IDs must be greater

Returns:

  • (ActiveRecord::Relation)


89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/deimos/active_record_producer.rb', line 89

def poll_query(time_from:, time_to:, column_name: :updated_at, min_id:)
  klass = @record_class
  table = ActiveRecord::Base.connection.quote_table_name(klass.table_name)
  column = ActiveRecord::Base.connection.quote_column_name(column_name)
  primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key)
  klass.where(
    "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \
     OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?",
    time_from,
    min_id,
    time_from,
    time_to
  )
end

.post_process(_records) ⇒ Object

Post process records after publishing

Parameters:

  • records (Array<ActiveRecord::Base>)


106
107
# File 'lib/deimos/active_record_producer.rb', line 106

def post_process(_records)
end

.record_class(klass = nil, refetch: true) ⇒ void

This method returns an undefined value.

Indicate the class this producer is working on. a record object, refetch the record to pass into the ‘generate_payload` method.

Parameters:

  • klass (Class) (defaults to: nil)
  • refetch (Boolean) (defaults to: true)

    if true, and we are given a hash instead of



21
22
23
24
25
26
# File 'lib/deimos/active_record_producer.rb', line 21

def record_class(klass=nil, refetch: true)
  return @record_class if klass.nil?

  @record_class = klass
  @refetch_record = refetch
end

.send_event(record, force_send: false) ⇒ void

This method returns an undefined value.

Parameters:

  • record (ActiveRecord::Base)
  • force_send (Boolean) (defaults to: false)


31
32
33
# File 'lib/deimos/active_record_producer.rb', line 31

def send_event(record, force_send: false)
  send_events([record], force_send: force_send)
end

.send_events(records, force_send: false) ⇒ void

This method returns an undefined value.

Parameters:

  • records (Array<ActiveRecord::Base>)
  • force_send (Boolean) (defaults to: false)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/deimos/active_record_producer.rb', line 38

def send_events(records, force_send: false)
  primary_key = @record_class&.primary_key
  messages = records.map do |record|
    if record.respond_to?(:attributes)
      attrs = record.attributes.with_indifferent_access
    else
      attrs = record.with_indifferent_access
      if @refetch_record && attrs[primary_key]
        record = @record_class.find(attrs[primary_key])
      end
    end
    generate_payload(attrs, record).with_indifferent_access
  end
  self.publish_list(messages, force_send: force_send)
  self.post_process(records)
end

.watched_attributes(_record) ⇒ Array<String>

Override this in active record producers to add non-schema fields to check for updates

Parameters:

  • _record (ActiveRecord::Base)

Returns:

  • (Array<String>)

    fields to check for updates



113
114
115
# File 'lib/deimos/active_record_producer.rb', line 113

def watched_attributes(_record)
  self.encoder.schema_fields.map(&:name)
end