Class: Deimos::KafkaMessage

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/deimos/kafka_message.rb

Overview

Store Kafka messages into the database.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.decoded(messages = []) ⇒ Array<Hash>

Decoded payloads for a list of messages.

Parameters:

Returns:

  • (Array<Hash>)


39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/deimos/kafka_message.rb', line 39

def self.decoded(messages=[])
  return [] if messages.empty?

  decoder_class = self.decoder(messages.first.topic)
  decoder = decoder_class&.new
  messages.map do |m|
    {
      key: m.key.present? ? decoder&.decode_key(m.key) || m.key : nil,
      payload: decoder_class&.decoder&.decode(m.message) || m.message
    }
  end
end

.decoder(topic) ⇒ Deimos::Consumer

Get a decoder to decode a set of messages on the given topic.

Parameters:

  • topic (String)

Returns:



27
28
29
30
31
32
33
34
# File 'lib/deimos/kafka_message.rb', line 27

def self.decoder(topic)
  producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
  return nil unless producer

  consumer = Class.new(Deimos::Consumer)
  consumer.config.merge!(producer.config)
  consumer
end

Instance Method Details

#decoded_messageHash

Decoded payload for this message.

Returns:

  • (Hash)


20
21
22
# File 'lib/deimos/kafka_message.rb', line 20

def decoded_message
  self.class.decoded([self]).first
end

#karafka_messageObject



52
53
54
55
56
57
58
59
# File 'lib/deimos/kafka_message.rb', line 52

def karafka_message
  {
    payload: self.message,
    partition_key: self.partition_key,
    key: self.key,
    topic: self.topic
  }
end

#message=(mess) ⇒ void

This method returns an undefined value.

Ensure it gets turned into a string, e.g. for testing purposes. It should already be a string.

Parameters:

  • mess (Object)


14
15
16
# File 'lib/deimos/kafka_message.rb', line 14

def message=(mess)
  write_attribute(:message, mess ? mess.to_s : nil)
end