Module: Deimos::KafkaSource::ClassMethods

Defined in:
lib/deimos/kafka_source.rb

Overview

:nodoc:

Instance Method Summary collapse

Instance Method Details

#import_without_validations_or_callbacks(column_names, array_of_attributes, options = {}) ⇒ Object

This is an internal method, part of the activerecord_import gem. It's the one that actually does the importing, having already normalized the inputs (arrays, hashes, records etc.) Basically we want to first do the import, then reload the records and send them to Kafka.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/deimos/kafka_source.rb', line 83

def import_without_validations_or_callbacks(column_names,
                                            array_of_attributes,
                                            options={})
  results = super
  if !self.kafka_config[:import] || array_of_attributes.empty?
    return results
  end

  # This will contain an array of hashes, where each hash is the actual
  # attribute hash that created the object.
  array_of_hashes = []
  array_of_attributes.each do |array|
    array_of_hashes << column_names.zip(array).to_h.with_indifferent_access
  end
  hashes_with_id, hashes_without_id = array_of_hashes.partition { |arr| arr[:id].present? }

  self.kafka_producers.each { |p| p.send_events(hashes_with_id) }

  if hashes_without_id.any?
    if options[:on_duplicate_key_update].present? &&
       options[:on_duplicate_key_update] != [:updated_at]
      unique_columns = column_names.map(&:to_s) -
                       options[:on_duplicate_key_update].map(&:to_s) - %w(id created_at)
      records = hashes_without_id.map do |hash|
        self.where(unique_columns.map { |c| [c, hash[c]] }.to_h).first
      end
      self.kafka_producers.each { |p| p.send_events(records) }
    else
      # re-fill IDs based on what was just entered into the DB.
      last_id = if self.connection.adapter_name.downcase =~ /sqlite/
                  self.connection.select_value('select last_insert_rowid()') -
                    hashes_without_id.size + 1
                else # mysql
                  self.connection.select_value('select LAST_INSERT_ID()')
                end
      hashes_without_id.each_with_index do |attrs, i|
        attrs[:id] = last_id + i
      end
      self.kafka_producers.each { |p| p.send_events(hashes_without_id) }
    end
  end
  results
end

#kafka_configHash

Returns:

  • (Hash)


59
60
61
62
63
64
65
66
# File 'lib/deimos/kafka_source.rb', line 59

def kafka_config
  {
    update: true,
    delete: true,
    import: true,
    create: true
  }
end

#kafka_producersArray<Deimos::ActiveRecordProducer>

Returns the producers to run.

Returns:

Raises:

  • (NotImplementedError)


69
70
71
72
73
74
75
76
# File 'lib/deimos/kafka_source.rb', line 69

def kafka_producers
  if self.respond_to?(:kafka_producer)
    Deimos.config.logger.warn(message: DEPRECATION_WARNING)
    return [self.kafka_producer]
  end

  raise NotImplementedError
end