Module: Karafka::Routing::Features::ConsumerGroups::DeadLetterQueue::Topic

Defined in:
lib/karafka/routing/features/consumer_groups/dead_letter_queue/topic.rb

Overview

DLQ topic extensions

Instance Method Summary collapse

Instance Method Details

#dead_letter_queue(max_retries: DEFAULT_MAX_RETRIES, topic: nil, independent: false, transactional: true, dispatch_method: :produce_async, marking_method: :mark_as_consumed, mark_after_dispatch: nil) ⇒ Config

Returns defined config.

Parameters:

  • max_retries (Integer) (defaults to: DEFAULT_MAX_RETRIES)

    after how many retries should we move data to dlq

  • topic (String, false) (defaults to: nil)

    where the messages should be moved if failing or false if we do not want to move it anywhere and just skip

  • independent (Boolean) (defaults to: false)

    needs to be true in order for each marking as consumed in a retry flow to reset the errors counter

  • transactional (Boolean) (defaults to: true)

    if applicable, should transaction be used to move given message to the dead-letter topic and mark it as consumed.

  • dispatch_method (Symbol) (defaults to: :produce_async)

    ‘:produce_async` or `:produce_sync`. Describes whether dispatch on dlq should be sync or async (async by default)

  • marking_method (Symbol) (defaults to: :mark_as_consumed)

    ‘:mark_as_consumed` or `:mark_as_consumed!`. Describes whether marking on DLQ should be async or sync (async by default)

  • mark_after_dispatch (Boolean, nil) (defaults to: nil)

    Should we mark after dispatch. ‘nil` means that the default strategy approach to marking will be used. `true` or `false` overwrites the default

Returns:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/routing/features/consumer_groups/dead_letter_queue/topic.rb', line 39

def dead_letter_queue(
  max_retries: DEFAULT_MAX_RETRIES,
  topic: nil,
  independent: false,
  transactional: true,
  dispatch_method: :produce_async,
  marking_method: :mark_as_consumed,
  mark_after_dispatch: nil
)
  @dead_letter_queue ||= Config.new(
    active: !topic.nil?,
    max_retries: max_retries,
    topic: topic,
    independent: independent,
    transactional: transactional,
    dispatch_method: dispatch_method,
    marking_method: marking_method,
    mark_after_dispatch: mark_after_dispatch
  )
end

#dead_letter_queue?Boolean

Returns is the dlq active or not.

Returns:

  • (Boolean)

    is the dlq active or not



61
62
63
# File 'lib/karafka/routing/features/consumer_groups/dead_letter_queue/topic.rb', line 61

def dead_letter_queue?
  dead_letter_queue.active?
end

#initializeObject

This method sets up the extra instance variable to nil before calling the parent class initializer. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



19
20
21
22
# File 'lib/karafka/routing/features/consumer_groups/dead_letter_queue/topic.rb', line 19

def initialize(...)
  @dead_letter_queue = nil
  super
end

#to_hHash

Returns topic with all its native configuration options plus dlq settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus dlq settings



66
67
68
69
70
# File 'lib/karafka/routing/features/consumer_groups/dead_letter_queue/topic.rb', line 66

def to_h
  super.merge(
    dead_letter_queue: dead_letter_queue.to_h
  ).freeze
end