Module: Karafka::Routing::Features::ConsumerGroups::ManualOffsetManagement::Topic

Defined in:
lib/karafka/routing/features/consumer_groups/manual_offset_management/topic.rb

Overview

Topic extensions to be able to manage manual offset management settings

Instance Method Summary collapse

Instance Method Details

#initializeObject

This method sets up the extra instance variable to nil before calling the parent class initializer. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



14
15
16
17
# File 'lib/karafka/routing/features/consumer_groups/manual_offset_management/topic.rb', line 14

def initialize(...)
  @manual_offset_management = nil
  super
end

#manual_offset_management(active = false) ⇒ Config

Note:

Since this feature supports only one setting (active), we can use the old API

where the boolean would be an argument

Parameters:

  • active (Boolean) (defaults to: false)

    should we stop managing the offset in Karafka and make the user responsible for marking messages as consumed.

Returns:



25
26
27
# File 'lib/karafka/routing/features/consumer_groups/manual_offset_management/topic.rb', line 25

def manual_offset_management(active = false)
  @manual_offset_management ||= Config.new(active: active)
end

#manual_offset_management?Boolean

Returns is manual offset management enabled for a given topic.

Returns:

  • (Boolean)

    is manual offset management enabled for a given topic



30
31
32
# File 'lib/karafka/routing/features/consumer_groups/manual_offset_management/topic.rb', line 30

def manual_offset_management?
  manual_offset_management.active?
end

#to_hHash

Returns topic with all its native configuration options plus manual offset management namespace settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus manual offset management namespace settings



36
37
38
39
40
# File 'lib/karafka/routing/features/consumer_groups/manual_offset_management/topic.rb', line 36

def to_h
  super.merge(
    manual_offset_management: manual_offset_management.to_h
  ).freeze
end