Module: Karafka::Setup::DefaultsInjector

Defined in:
lib/karafka/setup/defaults_injector.rb

Overview

Injector that enriches each Kafka cluster with needed defaults. User may use more than one cluster and define them on a per-topic basis. We use this when we build the final config per subscription group.

Class Method Summary collapse

Class Method Details

.consumer(kafka_config) ⇒ Object

Propagates the kafka setting defaults unless they are already present for consumer config This makes it easier to set some values that users usually don’t change but still allows them to overwrite the whole hash if they want to

Parameters:

  • kafka_config (Hash)

    kafka scoped config



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/setup/defaults_injector.rb', line 69

def consumer(kafka_config)
  CONSUMER_KAFKA_DEFAULTS.each do |key, value|
    next if kafka_config.key?(key)

    kafka_config[key] = value
  end

  return if Karafka::App.env.production?

  CONSUMER_KAFKA_DEV_DEFAULTS.each do |key, value|
    next if kafka_config.key?(key)

    kafka_config[key] = value
  end
end

.managed_keysSet<Symbol>

Kafka settings that are managed internally by Karafka and should not be set directly by users. Setting them manually may cause misbehaviours and other unexpected issues.

Returns:

  • (Set<Symbol>)

    set of managed kafka setting keys



59
60
61
62
63
# File 'lib/karafka/setup/defaults_injector.rb', line 59

def managed_keys
  @managed_keys ||= Set[
    :"statistics.unassigned.include"
  ]
end

.producer(kafka_config) ⇒ Object

Propagates the kafka settings defaults unless they are already present for producer config. This makes it easier to set some values that users usually don’t change but still allows them to overwrite the whole hash.

Parameters:

  • kafka_config (Hash)

    kafka scoped config



90
91
92
93
94
95
96
97
98
# File 'lib/karafka/setup/defaults_injector.rb', line 90

def producer(kafka_config)
  return if Karafka::App.env.production?

  PRODUCER_KAFKA_DEV_DEFAULTS.each do |key, value|
    next if kafka_config.key?(key)

    kafka_config[key] = value
  end
end