Class: Deimos::ActiveRecordProducer
- Defined in:
- lib/deimos/active_record_producer.rb
Overview
Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call `send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it's published.
You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.
Constant Summary
Constants inherited from Producer
Class Method Summary collapse
-
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record..
-
.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller.
-
.post_process(_records) ⇒ Object
Post process records after publishing.
-
.record_class(klass, refetch: true) ⇒ Object
Indicate the class this producer is working on.
- .send_event(record, force_send: false) ⇒ Object
- .send_events(records, force_send: false) ⇒ Object
Methods inherited from Producer
config, determine_backend_class, encoder, key_encoder, partition_key, produce_batch, publish, publish_list, topic, watched_attributes
Class Method Details
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/deimos/active_record_producer.rb', line 56 def generate_payload(attributes, _record) fields = self.encoder.schema_fields payload = attributes.stringify_keys payload.delete_if do |k, _| k.to_sym != :payload_key && !fields.map(&:name).include?(k) end return payload unless Utils::SchemaClass.use?(config.to_h) Utils::SchemaClass.instance(payload, config[:schema], config[:namespace]) end |
.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).
75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/deimos/active_record_producer.rb', line 75 def poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) klass = config[:record_class] table = ActiveRecord::Base.connection.quote_table_name(klass.table_name) column = ActiveRecord::Base.connection.quote_column_name(column_name) primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key) klass.where( "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \ OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?", time_from, min_id, time_from, time_to ) end |
.post_process(_records) ⇒ Object
Post process records after publishing
92 93 |
# File 'lib/deimos/active_record_producer.rb', line 92 def post_process(_records) end |
.record_class(klass, refetch: true) ⇒ Object
Indicate the class this producer is working on. a record object, refetch the record to pass into the `generate_payload` method.
20 21 22 23 |
# File 'lib/deimos/active_record_producer.rb', line 20 def record_class(klass, refetch: true) config[:record_class] = klass config[:refetch_record] = refetch end |
.send_event(record, force_send: false) ⇒ Object
27 28 29 |
# File 'lib/deimos/active_record_producer.rb', line 27 def send_event(record, force_send: false) send_events([record], force_send: force_send) end |
.send_events(records, force_send: false) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/deimos/active_record_producer.rb', line 33 def send_events(records, force_send: false) primary_key = config[:record_class]&.primary_key = records.map do |record| if record.respond_to?(:attributes) attrs = record.attributes.with_indifferent_access else attrs = record.with_indifferent_access if config[:refetch_record] && attrs[primary_key] record = config[:record_class].find(attrs[primary_key]) end end generate_payload(attrs, record).with_indifferent_access end self.publish_list(, force_send: force_send) self.post_process(records) end |