Class: Deimos::ActiveRecordProducer
- Defined in:
- lib/deimos/active_record_producer.rb
Overview
Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call ‘send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it’s published.
You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.
Constant Summary
Constants inherited from Producer
Class Method Summary collapse
- .encoder ⇒ Object
-
.generate_deletion_payload(record) ⇒ Hash
Deletion payload for a record by default, delegate to the model’s deletion_payload.
-
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record..
-
.poll_query(time_from:, time_to:, min_id:, column_name: :updated_at) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller.
-
.post_process(_records) ⇒ Object
Post process records after publishing.
-
.record_class(klass = nil, refetch: true) ⇒ void
Indicate the class this producer is working on.
- .record_klass ⇒ Class?
- .send_event(record, force_send: false) ⇒ void
- .send_events(records, force_send: false) ⇒ void
-
.watched_attributes(_record) ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates.
Methods inherited from Producer
determine_backend_class, karafka_config, partition_key, produce, produce_batch, publish, publish_list, topic
Class Method Details
.encoder ⇒ Object
63 64 65 66 67 |
# File 'lib/deimos/active_record_producer.rb', line 63 def encoder raise "No schema or namespace configured for #{self.name}" if karafka_config.nil? karafka_config.deserializers[:payload].backend end |
.generate_deletion_payload(record) ⇒ Hash
Deletion payload for a record by default, delegate to the model’s deletion_payload. Producers may override this method to customize the deletion key/payload per producer.
92 93 94 |
# File 'lib/deimos/active_record_producer.rb', line 92 def generate_deletion_payload(record) record.deletion_payload end |
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.
75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/deimos/active_record_producer.rb', line 75 def generate_payload(attributes, _record) fields = self.encoder.schema_fields payload = attributes.stringify_keys payload.delete_if do |k, _| k.to_sym != :payload_key && !fields.map(&:name).include?(k) end return payload if self.karafka_config.use_schema_classes.nil? && !Deimos.config.schema.use_schema_classes Utils::SchemaClass.instance(payload, encoder.schema, encoder.namespace) end |
.poll_query(time_from:, time_to:, min_id:, column_name: :updated_at) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).
104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/deimos/active_record_producer.rb', line 104 def poll_query(time_from:, time_to:, min_id:, column_name: :updated_at) klass = record_klass table = ActiveRecord::Base.connection.quote_table_name(klass.table_name) column = ActiveRecord::Base.connection.quote_column_name(column_name) primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key) klass.where( "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \ OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?", time_from, min_id, time_from, time_to ) end |
.post_process(_records) ⇒ Object
Post process records after publishing
121 122 |
# File 'lib/deimos/active_record_producer.rb', line 121 def post_process(_records) end |
.record_class(klass = nil, refetch: true) ⇒ void
This method returns an undefined value.
Indicate the class this producer is working on. a record object, refetch the record to pass into the ‘generate_payload` method.
21 22 23 24 25 26 |
# File 'lib/deimos/active_record_producer.rb', line 21 def record_class(klass=nil, refetch: true) return record_klass if klass.nil? @record_class = klass @refetch_record = refetch end |
.record_klass ⇒ Class?
29 30 31 32 |
# File 'lib/deimos/active_record_producer.rb', line 29 def record_klass @record_class = @record_class.constantize if @record_class.is_a?(String) @record_class end |
.send_event(record, force_send: false) ⇒ void
This method returns an undefined value.
37 38 39 |
# File 'lib/deimos/active_record_producer.rb', line 37 def send_event(record, force_send: false) send_events([record], force_send: force_send) end |
.send_events(records, force_send: false) ⇒ void
This method returns an undefined value.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/deimos/active_record_producer.rb', line 44 def send_events(records, force_send: false) return if Deimos.producers_disabled?(self) primary_key = record_klass&.primary_key = records.map do |record| if record.respond_to?(:attributes) attrs = record.attributes.with_indifferent_access else attrs = record.with_indifferent_access if @refetch_record && attrs[primary_key] record = record_klass.find(attrs[primary_key]) end end generate_payload(attrs, record).with_indifferent_access end self.publish_list(, force_send: force_send) self.post_process(records) end |
.watched_attributes(_record) ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates
128 129 130 |
# File 'lib/deimos/active_record_producer.rb', line 128 def watched_attributes(_record) self.encoder.schema_fields.map(&:name) end |