Class: Rimless::Consumer::App
- Inherits:
-
Karafka::App
- Object
- Karafka::App
- Rimless::Consumer::App
- Defined in:
- lib/rimless/consumer/app.rb
Overview
The consumer application which adds some convenience helpers and library-related configurations.
Instance Method Summary collapse
-
#configure {|Karafka::Setup::ConfigProxy| ... } ⇒ Rimless::Consumer::App
Allows the user to re-configure the Karafka application if this is needed.
-
#initialize ⇒ Rimless::Consumer::App
constructor
Creates a new Rimless/Karafka application instance while configuring our library defaults.
-
#server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not.
-
#topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts.
-
#topics(topics = []) { ... } ⇒ Rimless::Consumer::App
Configure the topics-consumer routing table in a lean way.
Constructor Details
#initialize ⇒ Rimless::Consumer::App
Creates a new Rimless/Karafka application instance while configuring our library defaults.
rubocop:disable Metrics/MethodLength – because of the Karafka
configuration
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/rimless/consumer/app.rb', line 18 def initialize # Run the parent class initialization super setup do |config| # See: https://bit.ly/3OtIfeu (+config.kafka+ settings) # An optional identifier of a Kafka consumer (in a consumer group) # that is passed to a Kafka broker with every request. A logical # application name to be included in Kafka logs and monitoring # aggregates. config.kafka[:'client.id'] = Rimless.configuration.client_id # All the known brokers, at least one. The ruby-kafka driver will # discover the whole cluster structure once and when issues occur # to dynamically adjust scaling operations. config.kafka[:'bootstrap.servers'] = Rimless.configuration.kafka_brokers # All brokers MUST acknowledge a new message by default config.kafka[:'request.required.acks'] = -1 # See: https://bit.ly/3MAF6Jk (+config.*+ settings) # Used to uniquely identify given client instance - for logging only config.client_id = [ Rimless.configuration.client_id, Process.pid, Socket.gethostname ].join('-') # Should be unique per application to properly track message # consumption. See: Kafka Consumer Groups. config.group_id = Rimless.configuration.client_id # We use dots (parts separation) and underscores for topic names, by # convention. config.strict_topics_namespacing = false # Number of milliseconds after which Karafka no longer waits for the # consumers to stop gracefully but instead we force terminate # everything. config.shutdown_timeout = 10.seconds.in_milliseconds # Recreate consumers with each batch. This will allow Rails code # reload to work in the development mode. Otherwise Karafka process # would not be aware of code changes. config.consumer_persistence = Rimless.env.production? # Use our logger instead config.logger = Rimless.logger end # Add the logging listener to Karafka in order to facilitate our gem # logger. When the user configuration results in an falsy value (eg. # +nil+ or +false+), we skip it. listener = Rimless.configuration.consumer_logger_listener Karafka.monitor.subscribe(listener) if listener # Configure some routing defaults routes.draw do defaults do deserializers( payload: Rimless.configuration.avro_deserializer_class.new ) end end # We configure synchronous logging to stdout as supervising/monitoring # processes await logging outputs to detect the Karafka server process # is up and running properly $stdout.sync = true # Call the user-configurable block with our configuration # for customizations setup(&Rimless.configuration.consumer_configure) end |
Instance Method Details
#configure {|Karafka::Setup::ConfigProxy| ... } ⇒ Rimless::Consumer::App
Allows the user to re-configure the Karafka application if this is needed. (eg. to set some kafka driver settings, etc)
172 173 174 175 |
# File 'lib/rimless/consumer/app.rb', line 172 def configure(&) setup(&) self end |
#server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not. Unfortunately Karafka still does not offer a solution for it like Sidekiq.server?. (Last tested with Karafka version 2.5.7)
182 183 184 |
# File 'lib/rimless/consumer/app.rb', line 182 def server? $PROGRAM_NAME.end_with?('karafka') && ARGV.include?('server') end |
#topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts. This allows various forms like single strings/symbols and a hash in the form of +{ app: [String, Symbol], name: [String, Symbol], names:
- Array<String, Symbol>
-
}+. This allows the maximum of flexibility.
154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/rimless/consumer/app.rb', line 154 def topic_names(parts) # We have a single app, but multiple names so we handle them if parts.is_a?(Hash) && parts.key?(:names) return parts[:names].map do |name| Rimless.topic(parts.merge(name: name)) end end # We cannot handle the given input [Rimless.topic(parts)] end |
#topics(topics = []) { ... } ⇒ Rimless::Consumer::App
Configure the topics-consumer routing table in a lean way.
Examples:
topics({ app: :test_app, name: :admins } => YourConsumer)
topics({ app: :test_app, names: %i[users admins] } => YourConsumer)
Examples:
topics(
{ app: :test_app, name: :admins } => lambda { |topic|
consumer Rimless::Consumer::JobBridge.build(dest_consumer)
}
)
Examples:
topics do
topic('name') do
consumer CustomConsumer
end
end
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/rimless/consumer/app.rb', line 121 def topics(topics = [], &block) routes.draw do consumer_group(Rimless.configuration.client_id) do instance_exec(&block) if block_given? topics.each do |topic_parts, dest_consumer| Rimless.consumer.topic_names(topic_parts).each do |topic_name| configure = proc do consumer( Rimless.configuration.job_bridge_class.build(dest_consumer) ) deserializers( payload: Rimless.configuration.avro_deserializer_class.new ) end configure = dest_consumer if dest_consumer.is_a? Proc topic(topic_name, &configure) end end end end self end |