Class: Deimos::KafkaMessage
- Inherits:
- 
      ActiveRecord::Base
      
        - Object
- ActiveRecord::Base
- Deimos::KafkaMessage
 
- Defined in:
- lib/deimos/kafka_message.rb
Overview
Store Kafka messages into the database.
Class Method Summary collapse
- 
  
    
      .decoded(messages = [])  ⇒ Array<Hash> 
    
    
  
  
  
  
  
  
  
  
  
    Decoded payloads for a list of messages. 
- 
  
    
      .decoder(topic)  ⇒ Deimos::Consumer 
    
    
  
  
  
  
  
  
  
  
  
    Get a decoder to decode a set of messages on the given topic. 
Instance Method Summary collapse
- 
  
    
      #decoded_message  ⇒ Hash 
    
    
  
  
  
  
  
  
  
  
  
    Decoded payload for this message. 
- 
  
    
      #message=(mess)  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    Ensure it gets turned into a string, e.g. 
- #phobos_message ⇒ Hash
Class Method Details
.decoded(messages = []) ⇒ Array<Hash>
Decoded payloads for a list of messages.
| 39 40 41 42 43 44 45 46 47 48 49 | # File 'lib/deimos/kafka_message.rb', line 39 def self.decoded(=[]) return [] if .empty? decoder = self.decoder(.first.topic)&.new .map do |m| { key: m.key.present? ? decoder&.decode_key(m.key) || m.key : nil, payload: decoder&.decoder&.decode(m.) || m. } end end | 
.decoder(topic) ⇒ Deimos::Consumer
Get a decoder to decode a set of messages on the given topic.
| 27 28 29 30 31 32 33 34 | # File 'lib/deimos/kafka_message.rb', line 27 def self.decoder(topic) producer = Deimos::Producer.descendants.find { |c| c.topic == topic } return nil unless producer consumer = Class.new(Deimos::Consumer) consumer.config.merge!(producer.config) consumer end | 
Instance Method Details
#decoded_message ⇒ Hash
Decoded payload for this message.
| 20 21 22 | # File 'lib/deimos/kafka_message.rb', line 20 def self.class.decoded([self]).first end | 
#message=(mess) ⇒ void
This method returns an undefined value.
Ensure it gets turned into a string, e.g. for testing purposes. It should already be a string.
| 14 15 16 | # File 'lib/deimos/kafka_message.rb', line 14 def (mess) write_attribute(:message, mess ? mess.to_s : nil) end | 
#phobos_message ⇒ Hash
| 52 53 54 55 56 57 58 59 | # File 'lib/deimos/kafka_message.rb', line 52 def { payload: self., partition_key: self.partition_key, key: self.key, topic: self.topic } end |