Module: Karafka::Pro::Routing::Features::Pausing::Topic

Defined in:
lib/karafka/pro/routing/features/pausing/topic.rb

Overview

Expansion allowing for a per topic pause strategy definitions

Instance Method Summary collapse

Instance Method Details

#initializeObject

This method calls the parent class initializer and then sets up the extra instance variable to nil. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



42
43
44
45
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 42

def initialize(...)
  super
  @pausing = nil
end

#pause(timeout: nil, max_timeout: nil, with_exponential_backoff: nil) ⇒ Config

Allows for per-topic pausing strategy setting

Parameters:

  • timeout (Integer) (defaults to: nil)

    how long should we wait upon processing error (milliseconds)

  • max_timeout (Integer) (defaults to: nil)

    what is the max timeout in case of an exponential backoff (milliseconds)

  • with_exponential_backoff (Boolean) (defaults to: nil)

    should we use exponential backoff

Returns:

  • (Config)

    pausing config object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 54

def pause(timeout: nil, max_timeout: nil, with_exponential_backoff: nil)
  # If no arguments provided, just return or initialize the config
  return pausing if timeout.nil? && max_timeout.nil? && with_exponential_backoff.nil?

  # Update instance variables for backwards compatibility
  # This ensures code reading @pause_timeout directly or via the inherited getter
  # will get the correct values
  @pause_timeout = timeout if timeout
  @pause_max_timeout = max_timeout if max_timeout

  unless with_exponential_backoff.nil?
    @pause_with_exponential_backoff = with_exponential_backoff
  end

  # Create or update the config
  @pausing ||= Config.new(
    active: false,
    timeout: @pause_timeout || Karafka::App.config.pause.timeout,
    max_timeout: @pause_max_timeout || Karafka::App.config.pause.max_timeout,
    with_exponential_backoff: if @pause_with_exponential_backoff.nil?
                                Karafka::App.config.pause.with_exponential_backoff
                              else
                                @pause_with_exponential_backoff
                              end
  )

  @pausing.timeout = timeout if timeout
  @pausing.max_timeout = max_timeout if max_timeout

  unless with_exponential_backoff.nil?
    @pausing.with_exponential_backoff = with_exponential_backoff
  end

  @pausing.active = true

  @pausing
end

#pausingConfig

Returns pausing configuration object.

Returns:

  • (Config)

    pausing configuration object



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 93

def pausing
  @pausing ||= Config.new(
    active: false,
    timeout: @pause_timeout || Karafka::App.config.pause.timeout,
    max_timeout: @pause_max_timeout || Karafka::App.config.pause.max_timeout,
    with_exponential_backoff: if @pause_with_exponential_backoff.nil?
                                Karafka::App.config.pause.with_exponential_backoff
                              else
                                @pause_with_exponential_backoff
                              end
  )
end

#pausing?Boolean

Returns is pausing explicitly configured.

Returns:

  • (Boolean)

    is pausing explicitly configured



107
108
109
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 107

def pausing?
  pausing.active?
end

#to_hHash

Returns topic with all its native configuration options plus pausing settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus pausing settings



112
113
114
115
116
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 112

def to_h
  super.merge(
    pausing: pausing.to_h
  ).freeze
end