Module: Deimos::KafkaSource::ClassMethods
- Defined in:
- lib/deimos/kafka_source.rb
Overview
:nodoc:
Instance Method Summary collapse
-
#import_without_validations_or_callbacks(column_names, array_of_attributes, options = {}) ⇒ Object
This is an internal method, part of the activerecord_import gem.
- #kafka_config ⇒ Hash
-
#kafka_producers ⇒ Array<Deimos::ActiveRecordProducer>
The producers to run.
Instance Method Details
#import_without_validations_or_callbacks(column_names, array_of_attributes, options = {}) ⇒ Object
This is an internal method, part of the activerecord_import gem. It's the one that actually does the importing, having already normalized the inputs (arrays, hashes, records etc.) Basically we want to first do the import, then reload the records and send them to Kafka.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/deimos/kafka_source.rb', line 83 def import_without_validations_or_callbacks(column_names, array_of_attributes, ={}) results = super if !self.kafka_config[:import] || array_of_attributes.empty? return results end # This will contain an array of hashes, where each hash is the actual # attribute hash that created the object. array_of_hashes = [] array_of_attributes.each do |array| array_of_hashes << column_names.zip(array).to_h.with_indifferent_access end hashes_with_id, hashes_without_id = array_of_hashes.partition { |arr| arr[:id].present? } self.kafka_producers.each { |p| p.send_events(hashes_with_id) } if hashes_without_id.any? if [:on_duplicate_key_update].present? && [:on_duplicate_key_update] != [:updated_at] unique_columns = column_names.map(&:to_s) - [:on_duplicate_key_update].map(&:to_s) - %w(id created_at) records = hashes_without_id.map do |hash| self.where(unique_columns.map { |c| [c, hash[c]] }.to_h).first end self.kafka_producers.each { |p| p.send_events(records) } else # re-fill IDs based on what was just entered into the DB. last_id = if self.connection.adapter_name.downcase =~ /sqlite/ self.connection.select_value('select last_insert_rowid()') - hashes_without_id.size + 1 else # mysql self.connection.select_value('select LAST_INSERT_ID()') end hashes_without_id.each_with_index do |attrs, i| attrs[:id] = last_id + i end self.kafka_producers.each { |p| p.send_events(hashes_without_id) } end end results end |
#kafka_config ⇒ Hash
59 60 61 62 63 64 65 66 |
# File 'lib/deimos/kafka_source.rb', line 59 def kafka_config { update: true, delete: true, import: true, create: true } end |
#kafka_producers ⇒ Array<Deimos::ActiveRecordProducer>
Returns the producers to run.
69 70 71 72 73 74 75 76 |
# File 'lib/deimos/kafka_source.rb', line 69 def kafka_producers if self.respond_to?(:kafka_producer) Deimos.config.logger.warn(message: DEPRECATION_WARNING) return [self.kafka_producer] end raise NotImplementedError end |