Module: Rdkafka::Consumer::Headers

Defined in:
lib/rdkafka/consumer/headers.rb

Overview

Interface to return headers for a consumer message

Constant Summary collapse

EMPTY_HEADERS =

Empty frozen hash used when there are no headers

{}.freeze

Class Method Summary collapse

Class Method Details

.from_native(native_message) ⇒ Hash{String => String, Array<String>}

Reads a librdkafka native message’s headers and returns them as a Ruby Hash where each key maps to either a String (single value) or Array<String> (multiple values) to support duplicate headers per KIP-82

Parameters:

Returns:

  • (Hash{String => String, Array<String>})

    headers Hash for the native_message

Raises:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rdkafka/consumer/headers.rb', line 19

def self.from_native(native_message)
  headers_ptrptr = FFI::MemoryPointer.new(:pointer)
  err = Rdkafka::Bindings.rd_kafka_message_headers(native_message, headers_ptrptr)

  if err == Rdkafka::Bindings::RD_KAFKA_RESP_ERR__NOENT
    return EMPTY_HEADERS
  elsif err != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(err, "Error reading message headers")
  end

  headers_ptr = headers_ptrptr.read_pointer

  name_ptrptr = FFI::MemoryPointer.new(:pointer)
  value_ptrptr = FFI::MemoryPointer.new(:pointer)
  size_ptr = Rdkafka::Bindings::SizePtr.new

  headers = {}

  idx = 0
  loop do
    err = Rdkafka::Bindings.rd_kafka_header_get_all(
      headers_ptr,
      idx,
      name_ptrptr,
      value_ptrptr,
      size_ptr
    )

    if err == Rdkafka::Bindings::RD_KAFKA_RESP_ERR__NOENT
      break
    elsif err != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      raise Rdkafka::RdkafkaError.new(err, "Error reading a message header at index #{idx}")
    end

    name_ptr = name_ptrptr.read_pointer
    name = name_ptr.respond_to?(:read_string_to_null) ? name_ptr.read_string_to_null : name_ptr.read_string

    size = size_ptr[:value]

    value_ptr = value_ptrptr.read_pointer
    value = value_ptr.read_string(size)

    if headers.key?(name)
      # If we've seen this header before, convert to array if needed and append
      if headers[name].is_a?(Array)
        headers[name] << value
      else
        headers[name] = [headers[name], value]
      end
    else
      # First occurrence - store as single value
      headers[name] = value
    end

    idx += 1
  end

  headers.freeze
end