Class: Karafka::Messages::Messages
- Inherits:
-
Object
- Object
- Karafka::Messages::Messages
- Includes:
- Enumerable
- Defined in:
- lib/karafka/messages/messages.rb
Overview
Messages batch represents a set of messages received from Kafka of a single topic partition.
Instance Attribute Summary collapse
-
#metadata ⇒ Object
readonly
Returns the value of attribute metadata.
Instance Method Summary collapse
-
#deserialize! ⇒ Array<Karafka::Messages::Message>
Runs deserialization of all the messages and returns them.
- #each ⇒ Object
-
#empty? ⇒ Boolean
Is the messages batch empty.
-
#first ⇒ Karafka::Messages::Message
First message.
-
#initialize(messages_array, metadata) ⇒ Karafka::Messages::Messages
constructor
Lazy evaluated messages batch object.
-
#last ⇒ Karafka::Messages::Message
Last message.
-
#payloads ⇒ Array<Object>
Array with deserialized payloads.
-
#raw ⇒ Array<Karafka::Messages::Message>
Returns the underlying messages array directly without duplication.
-
#raw_payloads ⇒ Array<String>
Array with raw, not deserialized payloads.
-
#size ⇒ Integer
(also: #count)
Number of messages in the batch.
-
#to_a ⇒ Array<Karafka::Messages::Message>
Copy of the pure array with messages.
Constructor Details
#initialize(messages_array, metadata) ⇒ Karafka::Messages::Messages
Returns lazy evaluated messages batch object.
14 15 16 17 |
# File 'lib/karafka/messages/messages.rb', line 14 def initialize(, ) @messages_array = @metadata = end |
Instance Attribute Details
#metadata ⇒ Object (readonly)
Returns the value of attribute metadata.
9 10 11 |
# File 'lib/karafka/messages/messages.rb', line 9 def @metadata end |
Instance Method Details
#deserialize! ⇒ Array<Karafka::Messages::Message>
Runs deserialization of all the messages and returns them
26 27 28 |
# File 'lib/karafka/messages/messages.rb', line 26 def deserialize! each(&:payload) end |
#each ⇒ Object
Invocation of this method will not cause loading and deserializing of messages.
20 21 22 |
# File 'lib/karafka/messages/messages.rb', line 20 def each(&) @messages_array.each(&) end |
#empty? ⇒ Boolean
Returns is the messages batch empty.
43 44 45 |
# File 'lib/karafka/messages/messages.rb', line 43 def empty? @messages_array.empty? end |
#first ⇒ Karafka::Messages::Message
Returns first message.
48 49 50 |
# File 'lib/karafka/messages/messages.rb', line 48 def first @messages_array.first end |
#last ⇒ Karafka::Messages::Message
Returns last message.
53 54 55 |
# File 'lib/karafka/messages/messages.rb', line 53 def last @messages_array.last end |
#payloads ⇒ Array<Object>
Returns array with deserialized payloads. This method can be useful when we don’t care about metadata and just want to extract all the data payloads from the batch.
33 34 35 |
# File 'lib/karafka/messages/messages.rb', line 33 def payloads map(&:payload) end |
#raw ⇒ Array<Karafka::Messages::Message>
This returns the actual internal array, not a copy. Do not modify it.
Returns the underlying messages array directly without duplication.
This method exists to provide Karafka internals with direct access to the messages array, bypassing any monkey patches that external libraries may apply to enumerable methods.
## Why this method exists
External instrumentation libraries like DataDog’s ‘dd-trace-rb` patch the `#each` method on this class to create tracing spans around message iteration. While this is desirable for user code (to trace message processing), it causes problems when Karafka’s internal infrastructure iterates over messages for housekeeping tasks (offset tracking, deserialization, etc.) - creating empty/unwanted spans.
By using ‘raw.map` or `raw.each` instead of `map` or `each` directly, internal code bypasses the patched `#each` method since it operates on the raw Array, not this class.
## Usage
This method should ONLY be used by Karafka internals. User-facing code (consumers, ActiveJob processors, etc.) should use regular ‘#each`/`#map` so that instrumentation libraries can properly trace message processing.
95 96 97 |
# File 'lib/karafka/messages/messages.rb', line 95 def raw @messages_array end |
#raw_payloads ⇒ Array<String>
Returns array with raw, not deserialized payloads.
38 39 40 |
# File 'lib/karafka/messages/messages.rb', line 38 def raw_payloads map(&:raw_payload) end |
#size ⇒ Integer Also known as: count
Returns number of messages in the batch.
58 59 60 |
# File 'lib/karafka/messages/messages.rb', line 58 def size @messages_array.size end |
#to_a ⇒ Array<Karafka::Messages::Message>
Returns copy of the pure array with messages.
63 64 65 |
# File 'lib/karafka/messages/messages.rb', line 63 def to_a @messages_array.dup end |