Class: Deimos::ActiveRecordProducer
- Defined in:
- lib/deimos/active_record_producer.rb
Overview
Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call `send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it's published.
You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.
Constant Summary
Constants inherited from Producer
Class Method Summary collapse
-
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record..
-
.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller.
-
.post_process(_records) ⇒ Object
Post process records after publishing.
-
.record_class(klass, refetch: true) ⇒ void
Indicate the class this producer is working on.
- .send_event(record, force_send: false) ⇒ void
- .send_events(records, force_send: false) ⇒ void
Methods inherited from Producer
config, determine_backend_class, encoder, key_encoder, partition_key, produce_batch, publish, publish_list, topic, watched_attributes
Class Method Details
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/deimos/active_record_producer.rb', line 59 def generate_payload(attributes, _record) fields = self.encoder.schema_fields payload = attributes.stringify_keys payload.delete_if do |k, _| k.to_sym != :payload_key && !fields.map(&:name).include?(k) end return payload unless Utils::SchemaClass.use?(config.to_h) Utils::SchemaClass.instance(payload, config[:schema], config[:namespace]) end |
.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).
78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/deimos/active_record_producer.rb', line 78 def poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) klass = config[:record_class] table = ActiveRecord::Base.connection.quote_table_name(klass.table_name) column = ActiveRecord::Base.connection.quote_column_name(column_name) primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key) klass.where( "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \ OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?", time_from, min_id, time_from, time_to ) end |
.post_process(_records) ⇒ Object
Post process records after publishing
95 96 |
# File 'lib/deimos/active_record_producer.rb', line 95 def post_process(_records) end |
.record_class(klass, refetch: true) ⇒ void
This method returns an undefined value.
Indicate the class this producer is working on. a record object, refetch the record to pass into the `generate_payload` method.
21 22 23 24 |
# File 'lib/deimos/active_record_producer.rb', line 21 def record_class(klass, refetch: true) config[:record_class] = klass config[:refetch_record] = refetch end |
.send_event(record, force_send: false) ⇒ void
This method returns an undefined value.
29 30 31 |
# File 'lib/deimos/active_record_producer.rb', line 29 def send_event(record, force_send: false) send_events([record], force_send: force_send) end |
.send_events(records, force_send: false) ⇒ void
This method returns an undefined value.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/deimos/active_record_producer.rb', line 36 def send_events(records, force_send: false) primary_key = config[:record_class]&.primary_key = records.map do |record| if record.respond_to?(:attributes) attrs = record.attributes.with_indifferent_access else attrs = record.with_indifferent_access if config[:refetch_record] && attrs[primary_key] record = config[:record_class].find(attrs[primary_key]) end end generate_payload(attrs, record).with_indifferent_access end self.publish_list(, force_send: force_send) self.post_process(records) end |