Class: Rimless::ConsumerApp
- Inherits:
-
Karafka::App
- Object
- Karafka::App
- Rimless::ConsumerApp
- Defined in:
- lib/rimless/consumer.rb
Overview
The global rimless Apache Kafka consumer application based on the Karafka framework.
rubocop:disable Style/ClassVars – because we just work as a singleton
Constant Summary collapse
- @@rimless_initialized =
We track our own initialization with this class variable
false
Class Method Summary collapse
-
.configure(&block) ⇒ Rimless::ConsumerApp
Allows the user to re-configure the Karafka application if this is needed.
-
.initialize! ⇒ Rimless::ConsumerApp
Initialize the Karafka framework and our global consumer application with all our conventions/opinions.
-
.initialize_code_reload! ⇒ Object
Perform code hot-reloading when we are in Rails and in development mode.
-
.initialize_karafka! ⇒ Object
Configure the pure basics on the Karafka application.
-
.initialize_logger! ⇒ Object
When we run in development mode, we want to write the logs to file and to stdout.
-
.initialize_monitors! ⇒ Object
We like to listen to instrumentation and logging events to allow our users to handle them like they need.
-
.initialize_rails! ⇒ Object
Check if Rails is available and not already initialized, then initialize it.
-
.server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not.
-
.topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts.
-
.topics(topics = []) { ... } ⇒ Object
Configure the topics-consumer routing table in a lean way.
Class Method Details
.configure(&block) ⇒ Rimless::ConsumerApp
Allows the user to re-configure the Karafka application if this is needed. (eg. to set some ruby-kafka driver default settings, etc)
127 128 129 130 |
# File 'lib/rimless/consumer.rb', line 127 def configure(&block) setup(&block) self end |
.initialize! ⇒ Rimless::ConsumerApp
Initialize the Karafka framework and our global consumer application with all our conventions/opinions.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rimless/consumer.rb', line 17 def initialize! # When already initialized, skip it return self if @@rimless_initialized # Initialize all the parts one by one initialize_rails! initialize_monitors! initialize_karafka! initialize_logger! initialize_code_reload! # Load the custom Karafka boot file when it exists, it contains # custom configurations and the topic/consumer routing table require ::Karafka.boot_file if ::Karafka.boot_file.exist? # Set our custom initialization process as completed to # skip subsequent calls @@rimless_initialized = true self end |
.initialize_code_reload! ⇒ Object
Perform code hot-reloading when we are in Rails and in development mode.
115 116 117 118 119 120 121 |
# File 'lib/rimless/consumer.rb', line 115 def initialize_code_reload! return unless defined?(Rails) && Rails.env.development? ::Karafka.monitor.subscribe(::Karafka::CodeReloader.new( *Rails.application.reloaders )) end |
.initialize_karafka! ⇒ Object
Configure the pure basics on the Karafka application.
rubocop:disable Metrics/MethodLength – because of the various settings rubocop:disable Metrics/AbcSize – dito
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/rimless/consumer.rb', line 81 def initialize_karafka! setup do |config| mapper = Rimless::Karafka::PassthroughMapper.new config.consumer_mapper = config.topic_mapper = mapper config.deserializer = Rimless::Karafka::AvroDeserializer.new config.kafka.seed_brokers = Rimless.configuration.kafka_brokers config.client_id = Rimless.configuration.client_id config.logger = Rimless.logger config.backend = :sidekiq config.batch_fetching = true config.batch_consuming = false config.shutdown_timeout = 10 end end |
.initialize_logger! ⇒ Object
When we run in development mode, we want to write the logs to file and to stdout.
100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/rimless/consumer.rb', line 100 def initialize_logger! # Skip when configured not to extend the logger return unless Rimless.configuration.extend_dev_logger # Skip when not in development environment or in the server process return unless Rimless.env.development? && server? $stdout.sync = true Rimless.logger.extend(ActiveSupport::Logger.broadcast( ActiveSupport::Logger.new($stdout) )) end |
.initialize_monitors! ⇒ Object
We like to listen to instrumentation and logging events to allow our users to handle them like they need.
67 68 69 70 71 72 73 74 75 |
# File 'lib/rimless/consumer.rb', line 67 def initialize_monitors! [ WaterDrop::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::ProctitleListener.new ].each do |listener| ::Karafka.monitor.subscribe(listener) end end |
.initialize_rails! ⇒ Object
Check if Rails is available and not already initialized, then initialize it.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/rimless/consumer.rb', line 40 def initialize_rails! rails_env = ::Karafka.root.join('config', 'environment.rb') # Stop, when Rails is already initialized return if defined? Rails # Stop, when there is no Rails at all return unless rails_env.exist? ENV['RAILS_ENV'] ||= 'development' ENV['KARAFKA_ENV'] = ENV.fetch('RAILS_ENV', nil) # This is relevant for Karafka server processes, as the +karafka.rb+ # root file just requires +rimless+ and then we require # +karafka-sidekiq-backend+ which in fact requires +sidekiq+ before # +rails+ was required. We cannot change the order here, but we can # explicitly load the Sidekiq/Rails integration as we know at this # point that we should load Rails and we're going to use Sidekiq, too. # See: https://bit.ly/3D8ZHj3 require 'sidekiq/rails' require rails_env Rails.application.eager_load! end |
.server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not.
195 196 197 |
# File 'lib/rimless/consumer.rb', line 195 def server? $PROGRAM_NAME.end_with?('karafka') && ARGV.include?('server') end |
.topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts. This allows various forms like single strings/symbols and a hash in the form of +{ app: [String, Symbol], name: [String, Symbol], names:
- Array<String, Symbol>
-
}+. This allows the maximum of flexibility.
180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/rimless/consumer.rb', line 180 def topic_names(parts) # We have a single app, but multiple names so we handle them if parts.is_a?(Hash) && parts.key?(:names) return parts[:names].map do |name| Rimless.topic(parts.merge(name: name)) end end # We cannot handle the given input [Rimless.topic(parts)] end |
.topics(topics = []) { ... } ⇒ Object
Configure the topics-consumer routing table in a lean way.
Examples:
topics({ app: :test_app, name: :admins } => YourConsumer)
topics({ app: :test_app, names: %i[users admins] } => YourConsumer)
Examples:
topics do
topic('name') do
consumer CustomConsumer
end
end
rubocop:disable Metrics/MethodLength – because of the Karafka DSL
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/rimless/consumer.rb', line 151 def topics(topics = [], &block) consumer_groups.draw do consumer_group(Rimless.configuration.client_id) do instance_exec(&block) if block_given? topics.each do |topic_parts, dest_consumer| Rimless.consumer.topic_names(topic_parts).each do |topic_name| topic(topic_name) do consumer dest_consumer worker Rimless::ConsumerJob interchanger Rimless::Karafka::Base64Interchanger.new end end end end end self end |