Class: Rimless::Consumer::AvroDeserializer

Inherits:
Object
  • Object
show all
Defined in:
lib/rimless/consumer/avro_deserializer.rb

Overview

A custom Apache Avro compatible message deserializer.

Constant Summary collapse

ISO_TIME_FORMAT =

The ISO8601 date/time format

/\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?/m
ISO_DATE_FORMAT =

The ISO8601 date format

/\A\d{4}-\d{2}-\d{2}\Z/m

Instance Method Summary collapse

Instance Method Details

#call(message) ⇒ Hash{Symbol => Mixed}?

Deserialize an Apache Avro encoded Apache Kafka message.

Parameters:

  • message (Karafka::Messages::Message)

    the Karafka message to deserialize

Returns:

  • (Hash{Symbol => Mixed}, nil)

    the deserialized Apache Avro message, or nil when we received a tombstone message



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/rimless/consumer/avro_deserializer.rb', line 19

def call(message)
  # When the Kafka message does not have a payload, we won't fail.
  # This is for Kafka users which use log compaction with a nil payload.
  return if message.raw_payload.nil?

  # We use sparsed hashes inside of Apache Avro messages for schema-less
  # blobs of data, such as loosely structured metadata blobs.  That's a
  # somewhat bad idea on strictly typed and defined messages, but their
  # occurrence should be rare.
  Rimless
    .decode(message.raw_payload)
    .then { |data| Sparsify(data, sparse_array: true) }
    .then { |data| data.transform_keys { |key| key.delete('\\') } }
    .then { |data| Unsparsify(data, sparse_array: true) }
    .deep_symbolize_keys
    .then do |obj|
      # When the configuration says we should not parse datetimes,
      # we skip further processing
      next obj \
        unless Rimless.configuration.avro_deserializer_parse_datetimes

      # Otherwise we parse them
      parse_timestamps!(obj)
    end
end

#parse_timestamps!(obj) ⇒ Mixed

Search recursively through the given object for ISO date/time string values and replace them with their parsed date representation. This works on hashes, arrays, and combinations of this.

Parameters:

  • value (Mixed)

    the input to process

Returns:

  • (Mixed)

    the processed input



51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/rimless/consumer/avro_deserializer.rb', line 51

def parse_timestamps!(obj)
  case obj
  when Hash
    obj.each { |key, val| obj[key] = parse_timestamps!(val) }
  when Array
    obj.each_with_index { |cur, idx| obj[idx] = parse_timestamps!(cur) }
  when ISO_TIME_FORMAT
    Time.zone.parse(obj)
  when ISO_DATE_FORMAT
    Date.parse(obj)
  else
    obj
  end
end