Module: Deimos::KafkaSource
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/deimos/kafka_source.rb
Overview
Represents an object which needs to inform Kafka when it is saved or bulk imported.
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- DEPRECATION_WARNING =
'The kafka_producer interface will be deprecated ' \ 'in future releases. Please use kafka_producers instead.'
Instance Method Summary collapse
-
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
-
#send_kafka_event_on_create ⇒ Object
Send the newly created model to Kafka.
-
#send_kafka_event_on_destroy ⇒ Object
Send a deletion (null payload) event to Kafka.
-
#send_kafka_event_on_update ⇒ Object
Send the newly updated model to Kafka.
Instance Method Details
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
52 53 54 |
# File 'lib/deimos/kafka_source.rb', line 52 def deletion_payload { payload_key: self[self.class.primary_key] } end |
#send_kafka_event_on_create ⇒ Object
Send the newly created model to Kafka.
19 20 21 22 23 24 |
# File 'lib/deimos/kafka_source.rb', line 19 def send_kafka_event_on_create return unless self.persisted? return unless self.class.kafka_config[:create] self.class.kafka_producers.each { |p| p.send_event(self) } end |
#send_kafka_event_on_destroy ⇒ Object
Send a deletion (null payload) event to Kafka.
44 45 46 47 48 |
# File 'lib/deimos/kafka_source.rb', line 44 def send_kafka_event_on_destroy return unless self.class.kafka_config[:delete] self.class.kafka_producers.each { |p| p.publish_list([self.deletion_payload]) } end |
#send_kafka_event_on_update ⇒ Object
Send the newly updated model to Kafka.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/deimos/kafka_source.rb', line 27 def send_kafka_event_on_update return unless self.class.kafka_config[:update] producers = self.class.kafka_producers fields = producers.flat_map(&:watched_attributes).uniq fields -= ['updated_at'] # Only send an event if a field we care about was changed. any_changes = fields.any? do |field| field_change = self.previous_changes[field] field_change.present? && field_change[0] != field_change[1] end return unless any_changes producers.each { |p| p.send_event(self) } end |