Class: Sbmt::KafkaConsumer::ClientConfigurer

Inherits:
Object
  • Object
show all
Defined in:
lib/sbmt/kafka_consumer/client_configurer.rb

Class Method Summary collapse

Class Method Details

.configure!(**opts) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/sbmt/kafka_consumer/client_configurer.rb', line 4

def self.configure!(**opts)
  config = Sbmt::KafkaConsumer::Config.new
  Sbmt::KafkaConsumer::Routing::Wildcards::Feature.activate

  Karafka::App.setup do |karafka_config|
    karafka_config.monitor = config.monitor_class.classify.constantize.new
    karafka_config.logger = Sbmt::KafkaConsumer.logger

    karafka_config.client_id = config.client_id
    karafka_config.kafka = config.to_kafka_options

    karafka_config.pause_timeout = config.pause_timeout * 1_000 if config.pause_timeout.present?
    karafka_config.pause_max_timeout = config.pause_max_timeout * 1_000 if config.pause_max_timeout.present?
    karafka_config.max_wait_time = config.max_wait_time * 1_000 if config.max_wait_time.present?
    karafka_config.shutdown_timeout = config.shutdown_timeout * 1_000 if config.shutdown_timeout.present?

    karafka_config.pause_with_exponential_backoff = config.pause_with_exponential_backoff if config.pause_with_exponential_backoff.present?

    karafka_config.concurrency = opts[:concurrency] || config.concurrency

    # Do not validate topics naming consistency
    # see https://github.com/karafka/karafka/wiki/FAQ#why-am-i-seeing-a-needs-to-be-consistent-namespacing-style-error
    karafka_config.strict_topics_namespacing = false

    # Recreate consumers with each batch. This will allow Rails code reload to work in the
    # development mode. Otherwise Karafka process would not be aware of code changes
    karafka_config.consumer_persistence = !Rails.env.development?
  end

  Karafka.monitor.subscribe(config.logger_listener_class.classify.constantize.new)
  Karafka.monitor.subscribe(config.metrics_listener_class.classify.constantize.new)

  target_consumer_groups = if opts[:consumer_groups].blank?
    config.consumer_groups
  else
    config.consumer_groups.select do |group|
      opts[:consumer_groups].include?(group.id)
    end
  end

  raise "No configured consumer groups found, exiting" if target_consumer_groups.blank?

  consumer_mapper = config.consumer_mapper_class.classify.constantize.new

  # clear routes in case CLI runner tries to reconfigure them
  # but railtie initializer had already executed and did the same
  # otherwise we'll get duplicate routes error from sbmt-karafka internal config validation process
  Karafka::App.routes.clear
  Karafka::App.routes.draw do
    target_consumer_groups.each do |cg|
      next if opts[:skip_regexp_consumers_init] && cg.topics.all? { |topic| topic.regexp.present? }

      group_id = consumer_mapper.call(cg.name)
      consumer_group group_id do
        cg.topics.each do |t|
          if t.regexp.present?
            next if opts[:skip_regexp_consumers_init]

            wildcard t.regexp do
              active t.active
              manual_offset_management t.manual_offset_management
              consumer t.consumer.consumer_klass
              deserializer t.deserializer.instantiate if t.deserializer.klass.present?
              kafka t.kafka_options.merge(inherit: true) if t.kafka_options.present?
            end
          else
            topic t.name do
              active t.active
              manual_offset_management t.manual_offset_management
              consumer t.consumer.consumer_klass
              deserializer t.deserializer.instantiate if t.deserializer.klass.present?
              kafka t.kafka_options.merge(inherit: true) if t.kafka_options.present?
            end
          end
        end
      end
    end
  end
end

.routesObject



84
85
86
87
88
89
# File 'lib/sbmt/kafka_consumer/client_configurer.rb', line 84

def self.routes
  Karafka::App.routes.map do |cg|
    topics = cg.topics.map { |t| {name: t.name, deserializer: t.deserializers.payload} }
    {group: cg.id, topics: topics}
  end
end