Class: Karafka::Pro::ScheduledMessages::Deserializers::Headers

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/scheduled_messages/deserializers/headers.rb

Overview

Converts certain pieces of headers into their integer form for messages

Instance Method Summary collapse

Instance Method Details

#call(metadata) ⇒ Hash

Returns headers.

Parameters:

Returns:

  • (Hash)

    headers



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/karafka/pro/scheduled_messages/deserializers/headers.rb', line 46

def call()
  raw_headers = .raw_headers

  type = raw_headers.fetch("schedule_source_type")

  # tombstone and cancellation events are not operable, thus we do not have to cast any
  # of the headers pieces
  return raw_headers unless WORKABLE_TYPES.include?(type)

  headers = raw_headers.dup
  headers["schedule_target_epoch"] = headers["schedule_target_epoch"].to_i

  # This attribute is optional, this is why we have to check for its existence
  if headers.key?("schedule_target_partition")
    headers["schedule_target_partition"] = headers["schedule_target_partition"].to_i
  end

  headers
end