Class: Deimos::ActiveRecordProducer
- Defined in:
- lib/deimos/active_record_producer.rb
Overview
Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call `send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it's published.
You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.
Constant Summary
Constants inherited from Producer
Class Method Summary collapse
- 
  
    
      .generate_payload(attributes, _record)  ⇒ Hash 
    
    
  
  
  
  
  
  
  
  
  
    Generate the payload, given a list of attributes or a record.. 
- 
  
    
      .poll_query(time_from:, time_to:, column_name: :updated_at, min_id:)  ⇒ ActiveRecord::Relation 
    
    
  
  
  
  
  
  
  
  
  
    Query to use when polling the database with the DbPoller. 
- 
  
    
      .post_process(_records)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Post process records after publishing. 
- 
  
    
      .record_class(klass, refetch: true)  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    Indicate the class this producer is working on. 
- .send_event(record, force_send: false) ⇒ void
- .send_events(records, force_send: false) ⇒ void
Methods inherited from Producer
config, determine_backend_class, encoder, key_encoder, partition_key, produce_batch, publish, publish_list, topic, watched_attributes
Class Method Details
.generate_payload(attributes, _record) ⇒ Hash
Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.
| 59 60 61 62 63 64 65 66 67 68 | # File 'lib/deimos/active_record_producer.rb', line 59 def generate_payload(attributes, _record) fields = self.encoder.schema_fields payload = attributes.stringify_keys payload.delete_if do |k, _| k.to_sym != :payload_key && !fields.map(&:name).include?(k) end return payload unless Utils::SchemaClass.use?(config.to_h) Utils::SchemaClass.instance(payload, config[:schema], config[:namespace]) end | 
.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation
Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).
| 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | # File 'lib/deimos/active_record_producer.rb', line 78 def poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) klass = config[:record_class] table = ActiveRecord::Base.connection.quote_table_name(klass.table_name) column = ActiveRecord::Base.connection.quote_column_name(column_name) primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key) klass.where( "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \ OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?", time_from, min_id, time_from, time_to ) end | 
.post_process(_records) ⇒ Object
Post process records after publishing
| 95 96 | # File 'lib/deimos/active_record_producer.rb', line 95 def post_process(_records) end | 
.record_class(klass, refetch: true) ⇒ void
This method returns an undefined value.
Indicate the class this producer is working on. a record object, refetch the record to pass into the `generate_payload` method.
| 21 22 23 24 | # File 'lib/deimos/active_record_producer.rb', line 21 def record_class(klass, refetch: true) config[:record_class] = klass config[:refetch_record] = refetch end | 
.send_event(record, force_send: false) ⇒ void
This method returns an undefined value.
| 29 30 31 | # File 'lib/deimos/active_record_producer.rb', line 29 def send_event(record, force_send: false) send_events([record], force_send: force_send) end | 
.send_events(records, force_send: false) ⇒ void
This method returns an undefined value.
| 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | # File 'lib/deimos/active_record_producer.rb', line 36 def send_events(records, force_send: false) primary_key = config[:record_class]&.primary_key = records.map do |record| if record.respond_to?(:attributes) attrs = record.attributes.with_indifferent_access else attrs = record.with_indifferent_access if config[:refetch_record] && attrs[primary_key] record = config[:record_class].find(attrs[primary_key]) end end generate_payload(attrs, record).with_indifferent_access end self.publish_list(, force_send: force_send) self.post_process(records) end |