Class: Rimless::Consumer::AvroDeserializer
- Inherits:
-
Object
- Object
- Rimless::Consumer::AvroDeserializer
- Defined in:
- lib/rimless/consumer/avro_deserializer.rb
Overview
A custom Apache Avro compatible message deserializer.
Constant Summary collapse
- ISO_TIME_FORMAT =
The ISO8601 date/time format
/\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?/m- ISO_DATE_FORMAT =
The ISO8601 date format
/\A\d{4}-\d{2}-\d{2}\Z/m
Instance Method Summary collapse
-
#call(message) ⇒ Hash{Symbol => Mixed}?
Deserialize an Apache Avro encoded Apache Kafka message.
-
#parse_timestamps!(obj) ⇒ Mixed
Search recursively through the given object for ISO date/time string values and replace them with their parsed date representation.
Instance Method Details
#call(message) ⇒ Hash{Symbol => Mixed}?
Deserialize an Apache Avro encoded Apache Kafka message.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/rimless/consumer/avro_deserializer.rb', line 19 def call() # When the Kafka message does not have a payload, we won't fail. # This is for Kafka users which use log compaction with a nil payload. return if .raw_payload.nil? # We use sparsed hashes inside of Apache Avro messages for schema-less # blobs of data, such as loosely structured metadata blobs. That's a # somewhat bad idea on strictly typed and defined messages, but their # occurrence should be rare. Rimless .decode(.raw_payload) .then { |data| Sparsify(data, sparse_array: true) } .then { |data| data.transform_keys { |key| key.delete('\\') } } .then { |data| Unsparsify(data, sparse_array: true) } .deep_symbolize_keys .then do |obj| # When the configuration says we should not parse datetimes, # we skip further processing next obj \ unless Rimless.configuration.avro_deserializer_parse_datetimes # Otherwise we parse them (obj) end end |
#parse_timestamps!(obj) ⇒ Mixed
Search recursively through the given object for ISO date/time string values and replace them with their parsed date representation. This works on hashes, arrays, and combinations of this.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/rimless/consumer/avro_deserializer.rb', line 51 def (obj) case obj when Hash obj.each { |key, val| obj[key] = (val) } when Array obj.each_with_index { |cur, idx| obj[idx] = (cur) } when ISO_TIME_FORMAT Time.zone.parse(obj) when ISO_DATE_FORMAT Date.parse(obj) else obj end end |