Module: Karafka::Connection::ApiAdapter
- Defined in:
- lib/karafka/connection/api_adapter.rb
Overview
The good thing about Kafka.new method is that it ignores all options that do nothing. So we don't have to worry about injecting our internal settings into the client and breaking stuff
Mapper used to convert our internal settings into ruby-kafka settings based on their API requirements. Since ruby-kafka has more and more options and there are few “levels” on which we have to apply them (despite the fact, that in Karafka you configure all of it in one place), we have to remap it into what ruby-kafka driver requires
Class Method Summary collapse
-
.client(consumer_group) ⇒ Array<Hash>
Builds all the configuration settings for Kafka.new method.
-
.consumer(consumer_group) ⇒ Hash
Builds all the configuration settings for kafka#consumer method.
-
.consumption(consumer_group) ⇒ Hash
Builds all the configuration settings for kafka consumer consume_each_batch and consume_each_message methods.
-
.mark_message_as_processed(params) ⇒ Array
Remaps topic details taking the topic mapper feature into consideration.
-
.pause(topic, partition, consumer_group) ⇒ Hash
Builds all the configuration settings required by kafka consumer#pause method.
-
.subscribe(topic) ⇒ Hash
Builds all the configuration settings for kafka consumer#subscribe method.
Class Method Details
.client(consumer_group) ⇒ Array<Hash>
We return array, so we can inject any arguments we want, in case of changes in the raw driver
Builds all the configuration settings for Kafka.new method
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/karafka/connection/api_adapter.rb', line 22 def client(consumer_group) # This one is a default that takes all the settings except special # cases defined in the map settings = { logger: ::Karafka.logger, client_id: ::Karafka::App.config.client_id } kafka_configs.each_key do |setting_name| # All options for config adapter should be ignored as we're just interested # in what is left, as we want to pass all the options that are "typical" # and not listed in the api_adapter special cases mapping. All the values # from the api_adapter mapping go somewhere else, not to the client directly next if AttributesMap.api_adapter.values.flatten.include?(setting_name) # Settings for each consumer group are either defined per consumer group or are # inherited from the global/general settings level, thus we don't have to fetch them # from the kafka settings as they are already on a consumer group level settings[setting_name] = consumer_group.public_send(setting_name) end settings_hash = sanitize(settings) # Normalization for the way Kafka::Client accepts arguments from 0.5.3 [settings_hash.delete(:seed_brokers), settings_hash] end |
.consumer(consumer_group) ⇒ Hash
Builds all the configuration settings for kafka#consumer method
53 54 55 56 57 |
# File 'lib/karafka/connection/api_adapter.rb', line 53 def consumer(consumer_group) settings = { group_id: consumer_group.id } settings = fetch_for(:consumer, consumer_group, settings) sanitize(settings) end |
.consumption(consumer_group) ⇒ Hash
Builds all the configuration settings for kafka consumer consume_each_batch and
methods
65 66 67 68 69 70 71 72 73 |
# File 'lib/karafka/connection/api_adapter.rb', line 65 def consumption(consumer_group) sanitize( fetch_for( :consumption, consumer_group, automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed ) ) end |
.mark_message_as_processed(params) ⇒ Array
When default empty topic mapper is used, no need for any conversion as the internal and external format are exactly the same
Remaps topic details taking the topic mapper feature into consideration.
105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/karafka/connection/api_adapter.rb', line 105 def (params) # Majority of users don't use custom topic mappers. No need to change anything when it # is a default mapper that does not change anything. Only some cloud providers require # topics to be remapped return [params.] if Karafka::App.config.topic_mapper.is_a?( Karafka::Routing::TopicMapper ) # @note We don't use tap as it is around 13% slower than non-dup version dupped = params..dup dupped['topic'] = Karafka::App.config.topic_mapper.outgoing(params..topic) [dupped] end |
.pause(topic, partition, consumer_group) ⇒ Hash
Builds all the configuration settings required by kafka consumer#pause method
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/karafka/connection/api_adapter.rb', line 88 def pause(topic, partition, consumer_group) { args: [Karafka::App.config.topic_mapper.outgoing(topic), partition], kwargs: { timeout: consumer_group.pause_timeout, max_timeout: consumer_group.pause_max_timeout, exponential_backoff: consumer_group.pause_exponential_backoff } } end |
.subscribe(topic) ⇒ Hash
Builds all the configuration settings for kafka consumer#subscribe method
78 79 80 81 |
# File 'lib/karafka/connection/api_adapter.rb', line 78 def subscribe(topic) settings = fetch_for(:subscribe, topic) [Karafka::App.config.topic_mapper.outgoing(topic.name), sanitize(settings)] end |