Class: Deimos::KafkaMessage

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/deimos/kafka_message.rb

Overview

Store Kafka messages into the database.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.decoded(messages = []) ⇒ Array<Hash>

Decoded payloads for a list of messages.

Parameters:

Returns:

  • (Array<Hash>)


38
39
40
41
42
43
44
45
46
47
48
# File 'lib/deimos/kafka_message.rb', line 38

def self.decoded(messages=[])
  return [] if messages.empty?

  decoder = self.decoder(messages.first.topic)&.new
  messages.map do |m|
    {
      key: m.key.present? ? decoder&.decode_key(m.key) || m.key : nil,
      payload: decoder&.decoder&.decode(m.message) || m.message
    }
  end
end

.decoder(topic) ⇒ Deimos::Consumer

Get a decoder to decode a set of messages on the given topic.

Parameters:

  • topic (String)

Returns:



26
27
28
29
30
31
32
33
# File 'lib/deimos/kafka_message.rb', line 26

def self.decoder(topic)
  producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
  return nil unless producer

  consumer = Class.new(Deimos::Consumer)
  consumer.config.merge!(producer.config)
  consumer
end

Instance Method Details

#decoded_messageHash

Decoded payload for this message.

Returns:

  • (Hash)


19
20
21
# File 'lib/deimos/kafka_message.rb', line 19

def decoded_message
  self.class.decoded([self]).first
end

#message=(mess) ⇒ Object

Ensure it gets turned into a string, e.g. for testing purposes. It should already be a string.

Parameters:

  • mess (Object)


13
14
15
# File 'lib/deimos/kafka_message.rb', line 13

def message=(mess)
  write_attribute(:message, mess ? mess.to_s : nil)
end

#phobos_messageHash

Returns:

  • (Hash)


51
52
53
54
55
56
57
58
# File 'lib/deimos/kafka_message.rb', line 51

def phobos_message
  {
    payload: self.message,
    partition_key: self.partition_key,
    key: self.key,
    topic: self.topic
  }
end