Class: Deimos::ActiveRecordProducer
- Defined in:
- lib/deimos/active_record_producer.rb
Overview
Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call ‘send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it’s published.
You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.
Constant Summary
Constants inherited from Producer
Class Method Summary collapse
- .config ⇒ Object
- .encoder ⇒ Object
-
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record..
-
.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller.
-
.post_process(_records) ⇒ Object
Post process records after publishing.
-
.record_class(klass = nil, refetch: true) ⇒ void
Indicate the class this producer is working on.
- .send_event(record, force_send: false) ⇒ void
- .send_events(records, force_send: false) ⇒ void
-
.watched_attributes(_record) ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates.
Methods inherited from Producer
determine_backend_class, karafka_config, partition_key, produce, produce_batch, publish, publish_list, topic
Class Method Details
.config ⇒ Object
55 56 57 |
# File 'lib/deimos/active_record_producer.rb', line 55 def config Deimos.karafka_configs.find { |t| t.producer_class == self } end |
.encoder ⇒ Object
59 60 61 62 |
# File 'lib/deimos/active_record_producer.rb', line 59 def encoder raise "No schema or namespace configured for #{self.name}" if config.nil? config.deserializers[:payload].backend end |
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/deimos/active_record_producer.rb', line 70 def generate_payload(attributes, _record) fields = self.encoder.schema_fields payload = attributes.stringify_keys payload.delete_if do |k, _| k.to_sym != :payload_key && !fields.map(&:name).include?(k) end return payload unless self.config.use_schema_classes Utils::SchemaClass.instance(payload, encoder.schema, encoder.namespace) end |
.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).
89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/deimos/active_record_producer.rb', line 89 def poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) klass = @record_class table = ActiveRecord::Base.connection.quote_table_name(klass.table_name) column = ActiveRecord::Base.connection.quote_column_name(column_name) primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key) klass.where( "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \ OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?", time_from, min_id, time_from, time_to ) end |
.post_process(_records) ⇒ Object
Post process records after publishing
106 107 |
# File 'lib/deimos/active_record_producer.rb', line 106 def post_process(_records) end |
.record_class(klass = nil, refetch: true) ⇒ void
This method returns an undefined value.
Indicate the class this producer is working on. a record object, refetch the record to pass into the ‘generate_payload` method.
21 22 23 24 25 26 |
# File 'lib/deimos/active_record_producer.rb', line 21 def record_class(klass=nil, refetch: true) return @record_class if klass.nil? @record_class = klass @refetch_record = refetch end |
.send_event(record, force_send: false) ⇒ void
This method returns an undefined value.
31 32 33 |
# File 'lib/deimos/active_record_producer.rb', line 31 def send_event(record, force_send: false) send_events([record], force_send: force_send) end |
.send_events(records, force_send: false) ⇒ void
This method returns an undefined value.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/deimos/active_record_producer.rb', line 38 def send_events(records, force_send: false) primary_key = @record_class&.primary_key = records.map do |record| if record.respond_to?(:attributes) attrs = record.attributes.with_indifferent_access else attrs = record.with_indifferent_access if @refetch_record && attrs[primary_key] record = @record_class.find(attrs[primary_key]) end end generate_payload(attrs, record).with_indifferent_access end self.publish_list(, force_send: force_send) self.post_process(records) end |
.watched_attributes(_record) ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates
113 114 115 |
# File 'lib/deimos/active_record_producer.rb', line 113 def watched_attributes(_record) self.encoder.schema_fields.map(&:name) end |