Class: Karafka::Messages::Messages

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/karafka/messages/messages.rb

Overview

Messages batch represents a set of messages received from Kafka of a single topic partition.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(messages_array, metadata) ⇒ Karafka::Messages::Messages

Returns lazy evaluated messages batch object.

Parameters:



14
15
16
17
# File 'lib/karafka/messages/messages.rb', line 14

def initialize(messages_array, )
  @messages_array = messages_array
  @metadata = 
end

Instance Attribute Details

#metadataObject (readonly)

Returns the value of attribute metadata.



9
10
11
# File 'lib/karafka/messages/messages.rb', line 9

def 
  @metadata
end

Instance Method Details

#deserialize!Array<Karafka::Messages::Message>

Runs deserialization of all the messages and returns them

Returns:



26
27
28
# File 'lib/karafka/messages/messages.rb', line 26

def deserialize!
  each(&:payload)
end

#eachObject

Note:

Invocation of this method will not cause loading and deserializing of messages.



20
21
22
# File 'lib/karafka/messages/messages.rb', line 20

def each(&)
  @messages_array.each(&)
end

#empty?Boolean

Returns is the messages batch empty.

Returns:

  • (Boolean)

    is the messages batch empty



43
44
45
# File 'lib/karafka/messages/messages.rb', line 43

def empty?
  @messages_array.empty?
end

#firstKarafka::Messages::Message

Returns first message.

Returns:



48
49
50
# File 'lib/karafka/messages/messages.rb', line 48

def first
  @messages_array.first
end

#lastKarafka::Messages::Message

Returns last message.

Returns:



53
54
55
# File 'lib/karafka/messages/messages.rb', line 53

def last
  @messages_array.last
end

#payloadsArray<Object>

Returns array with deserialized payloads. This method can be useful when we don’t care about metadata and just want to extract all the data payloads from the batch.

Returns:

  • (Array<Object>)

    array with deserialized payloads. This method can be useful when we don’t care about metadata and just want to extract all the data payloads from the batch



33
34
35
# File 'lib/karafka/messages/messages.rb', line 33

def payloads
  map(&:payload)
end

#rawArray<Karafka::Messages::Message>

Note:

This returns the actual internal array, not a copy. Do not modify it.

Returns the underlying messages array directly without duplication.

This method exists to provide Karafka internals with direct access to the messages array, bypassing any monkey patches that external libraries may apply to enumerable methods.

## Why this method exists

External instrumentation libraries like DataDog’s ‘dd-trace-rb` patch the `#each` method on this class to create tracing spans around message iteration. While this is desirable for user code (to trace message processing), it causes problems when Karafka’s internal infrastructure iterates over messages for housekeeping tasks (offset tracking, deserialization, etc.) - creating empty/unwanted spans.

By using ‘raw.map` or `raw.each` instead of `map` or `each` directly, internal code bypasses the patched `#each` method since it operates on the raw Array, not this class.

## Usage

This method should ONLY be used by Karafka internals. User-facing code (consumers, ActiveJob processors, etc.) should use regular ‘#each`/`#map` so that instrumentation libraries can properly trace message processing.

Returns:

See Also:



95
96
97
# File 'lib/karafka/messages/messages.rb', line 95

def raw
  @messages_array
end

#raw_payloadsArray<String>

Returns array with raw, not deserialized payloads.

Returns:

  • (Array<String>)

    array with raw, not deserialized payloads



38
39
40
# File 'lib/karafka/messages/messages.rb', line 38

def raw_payloads
  map(&:raw_payload)
end

#sizeInteger Also known as: count

Returns number of messages in the batch.

Returns:

  • (Integer)

    number of messages in the batch



58
59
60
# File 'lib/karafka/messages/messages.rb', line 58

def size
  @messages_array.size
end

#to_aArray<Karafka::Messages::Message>

Returns copy of the pure array with messages.

Returns:



63
64
65
# File 'lib/karafka/messages/messages.rb', line 63

def to_a
  @messages_array.dup
end