Module: Karafka::Pro::Routing::Features::ConsumerGroups::AdaptiveIterator::Topic

Defined in:
lib/karafka/pro/routing/features/consumer_groups/adaptive_iterator/topic.rb

Overview

Topic extension allowing us to enable and configure adaptive iterator

Instance Method Summary collapse

Instance Method Details

#adaptive_iterator(active: false, safety_margin: 10, marking_method: :mark_as_consumed, clean_after_yielding: true) ⇒ Object

Parameters:

  • active (Boolean) (defaults to: false)

    should we use the automatic adaptive iterator

  • safety_margin (Integer) (defaults to: 10)

    How big of a margin we leave ourselves so we can safely communicate back with Kafka, etc. We stop and seek back when we’ve burned 85% of the time by default. We leave 15% of time for post-processing operations so we have space before we hit max.poll.interval.ms.

  • marking_method (Symbol) (defaults to: :mark_as_consumed)

    If we should, how should we mark

  • clean_after_yielding (Boolean) (defaults to: true)

    Should we clean post-yielding via the cleaner API



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/karafka/pro/routing/features/consumer_groups/adaptive_iterator/topic.rb', line 57

def adaptive_iterator(
  active: false,
  safety_margin: 10,
  marking_method: :mark_as_consumed,
  clean_after_yielding: true
)
  @adaptive_iterator ||= Config.new(
    active: active,
    safety_margin: safety_margin,
    marking_method: marking_method,
    clean_after_yielding: clean_after_yielding
  )
end

#adaptive_iterator?Boolean

Returns Is adaptive iterator active. It is always ‘true`, since we use it via explicit messages batch wrapper.

Returns:

  • (Boolean)

    Is adaptive iterator active. It is always ‘true`, since we use it via explicit messages batch wrapper



73
74
75
# File 'lib/karafka/pro/routing/features/consumer_groups/adaptive_iterator/topic.rb', line 73

def adaptive_iterator?
  adaptive_iterator.active?
end

#initializeObject

This method sets up the extra instance variable to nil before calling the parent class initializer. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



43
44
45
46
# File 'lib/karafka/pro/routing/features/consumer_groups/adaptive_iterator/topic.rb', line 43

def initialize(...)
  @adaptive_iterator = nil
  super
end

#to_hHash

Returns topic with all its native configuration options plus poll guarding setup configuration.

Returns:

  • (Hash)

    topic with all its native configuration options plus poll guarding setup configuration.



79
80
81
82
83
# File 'lib/karafka/pro/routing/features/consumer_groups/adaptive_iterator/topic.rb', line 79

def to_h
  super.merge(
    adaptive_iterator: adaptive_iterator.to_h
  ).freeze
end