Class: Deimos::KafkaMessage
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Deimos::KafkaMessage
- Defined in:
- lib/deimos/kafka_message.rb
Overview
Store Kafka messages into the database.
Class Method Summary collapse
-
.decoded(messages = []) ⇒ Array<Hash>
Decoded payloads for a list of messages.
-
.decoder(topic) ⇒ Deimos::Consumer
Get a decoder to decode a set of messages on the given topic.
Instance Method Summary collapse
-
#decoded_message ⇒ Hash
Decoded payload for this message.
- #karafka_message ⇒ Object
-
#message=(mess) ⇒ void
Ensure it gets turned into a string, e.g.
Class Method Details
.decoded(messages = []) ⇒ Array<Hash>
Decoded payloads for a list of messages.
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/deimos/kafka_message.rb', line 39 def self.decoded(=[]) return [] if .empty? decoder_class = self.decoder(.first.topic) decoder = decoder_class&.new .map do |m| { key: m.key.present? ? decoder&.decode_key(m.key) || m.key : nil, payload: decoder_class&.decoder&.decode(m.) || m. } end end |
.decoder(topic) ⇒ Deimos::Consumer
Get a decoder to decode a set of messages on the given topic.
27 28 29 30 31 32 33 34 |
# File 'lib/deimos/kafka_message.rb', line 27 def self.decoder(topic) producer = Deimos::Producer.descendants.find { |c| c.topic == topic } return nil unless producer consumer = Class.new(Deimos::Consumer) consumer.config.merge!(producer.config) consumer end |
Instance Method Details
#decoded_message ⇒ Hash
Decoded payload for this message.
20 21 22 |
# File 'lib/deimos/kafka_message.rb', line 20 def self.class.decoded([self]).first end |
#karafka_message ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/deimos/kafka_message.rb', line 52 def { payload: self., partition_key: self.partition_key, key: self.key, topic: self.topic } end |
#message=(mess) ⇒ void
This method returns an undefined value.
Ensure it gets turned into a string, e.g. for testing purposes. It should already be a string.
14 15 16 |
# File 'lib/deimos/kafka_message.rb', line 14 def (mess) write_attribute(:message, mess ? mess.to_s : nil) end |