4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/sbmt/kafka_consumer/client_configurer.rb', line 4
def self.configure!(**opts)
config = Sbmt::KafkaConsumer::Config.new
Sbmt::KafkaConsumer::Routing::Wildcards::Feature.activate
Karafka::App.setup do |karafka_config|
karafka_config.monitor = config.monitor_class.classify.constantize.new
karafka_config.logger = Sbmt::KafkaConsumer.logger
karafka_config.client_id = config.client_id
karafka_config.kafka = config.to_kafka_options
karafka_config.pause_timeout = config.pause_timeout * 1_000 if config.pause_timeout.present?
karafka_config.pause_max_timeout = config.pause_max_timeout * 1_000 if config.pause_max_timeout.present?
karafka_config.max_wait_time = config.max_wait_time * 1_000 if config.max_wait_time.present?
karafka_config.shutdown_timeout = config.shutdown_timeout * 1_000 if config.shutdown_timeout.present?
karafka_config.pause_with_exponential_backoff = config.pause_with_exponential_backoff if config.pause_with_exponential_backoff.present?
karafka_config.concurrency = opts[:concurrency] || config.concurrency
karafka_config.strict_topics_namespacing = false
karafka_config.consumer_persistence = !Rails.env.development?
end
Karafka.monitor.subscribe(config.logger_listener_class.classify.constantize.new)
Karafka.monitor.subscribe(config.metrics_listener_class.classify.constantize.new)
target_consumer_groups = if opts[:consumer_groups].blank?
config.consumer_groups
else
config.consumer_groups.select do |group|
opts[:consumer_groups].include?(group.id)
end
end
raise "No configured consumer groups found, exiting" if target_consumer_groups.blank?
consumer_mapper = config.consumer_mapper_class.classify.constantize.new
Karafka::App.routes.clear
Karafka::App.routes.draw do
target_consumer_groups.each do |cg|
next if opts[:skip_regexp_consumers_init] && cg.topics.all? { |topic| topic.regexp.present? }
group_id = consumer_mapper.call(cg.name)
consumer_group group_id do
cg.topics.each do |t|
if t.regexp.present?
next if opts[:skip_regexp_consumers_init]
wildcard t.regexp do
active t.active
manual_offset_management t.manual_offset_management
consumer t.consumer.consumer_klass
deserializer t.deserializer.instantiate if t.deserializer.klass.present?
kafka t.kafka_options.merge(inherit: true) if t.kafka_options.present?
end
else
topic t.name do
active t.active
manual_offset_management t.manual_offset_management
consumer t.consumer.consumer_klass
deserializer t.deserializer.instantiate if t.deserializer.klass.present?
kafka t.kafka_options.merge(inherit: true) if t.kafka_options.present?
end
end
end
end
end
end
end
|