Class: Karafka::Connection::MessagesBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/messages_buffer.rb

Overview

Note:

This buffer is NOT thread safe. We do not worry about it as we do not use it outside of the main listener loop. It can be cleared after the jobs are scheduled with messages it stores, because messages arrays are not “cleared” in any way directly and their reference stays.

Buffer used to build and store karafka messages built based on raw librdkafka messages.

Why do we have two buffers? ‘RawMessagesBuffer` is used to store raw messages and to handle

cases related to partition revocation and reconnections. It is "internal" to the listening
process. `MessagesBuffer` on the other hand is used to "translate" those raw messages that
we know that are ok into Karafka messages and to simplify further work with them.

While it adds a bit of overhead, it makes conceptual things much easier and it adds only two

simple hash iterations over messages batch.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group) ⇒ MessagesBuffer

Returns a new instance of MessagesBuffer.

Parameters:



23
24
25
26
27
28
29
30
31
# File 'lib/karafka/connection/messages_buffer.rb', line 23

def initialize(subscription_group)
  @subscription_group = subscription_group
  @size = 0
  @groups = Hash.new do |topic_groups, topic|
    topic_groups[topic] = Hash.new do |partition_groups, partition|
      partition_groups[partition] = []
    end
  end
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



20
21
22
# File 'lib/karafka/connection/messages_buffer.rb', line 20

def size
  @size
end

Instance Method Details

#each {|topic, partition, messages| ... } ⇒ Object

Allows to iterate over all the topics and partitions messages

Yield Parameters:

  • topic (String)

    name

  • partition (Integer)

    number

  • messages (Array<Karafka::Messages::Message>)

    from a given topic partition



62
63
64
65
66
67
68
# File 'lib/karafka/connection/messages_buffer.rb', line 62

def each
  @groups.each do |topic, partitions|
    partitions.each do |partition, messages|
      yield(topic, partition, messages)
    end
  end
end

#empty?Boolean

Returns is the buffer empty or does it contain any messages.

Returns:

  • (Boolean)

    is the buffer empty or does it contain any messages



83
84
85
# File 'lib/karafka/connection/messages_buffer.rb', line 83

def empty?
  @size.zero?
end

#present?(topic, partition) ⇒ Boolean

Checks if there are any messages from a given topic partition in the buffer

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

Returns:

  • (Boolean)

    true if there is at least one message from this topic partition, otherwise false



75
76
77
78
79
80
# File 'lib/karafka/connection/messages_buffer.rb', line 75

def present?(topic, partition)
  return false unless @groups.include?(topic)
  return false unless @groups[topic].include?(partition)

  true
end

#remap(raw_messages_buffer) ⇒ Object

Remaps raw messages from the raw messages buffer to Karafka messages

Parameters:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/karafka/connection/messages_buffer.rb', line 35

def remap(raw_messages_buffer)
  clear unless @size.zero?

  # Since it happens "right after" we've received the messages, it is close enough it time
  # to be used as the moment we received messages.
  received_at = Time.now

  raw_messages_buffer.each do |topic, partition, messages|
    @size += messages.count

    ktopic = @subscription_group.topics.find(topic)

    @groups[topic][partition] = messages.map do |message|
      Messages::Builders::Message.call(
        message,
        ktopic,
        received_at
      )
    end
  end
end