Class: Sbmt::KafkaConsumer::BaseConsumer
- Inherits:
-
Karafka::BaseConsumer
- Object
- Karafka::BaseConsumer
- Sbmt::KafkaConsumer::BaseConsumer
show all
- Defined in:
- lib/sbmt/kafka_consumer/base_consumer.rb
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.consumer_klass(skip_on_error: nil, middlewares: nil, batch_middlewares: nil) ⇒ Object
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# File 'lib/sbmt/kafka_consumer/base_consumer.rb', line 10
def self.consumer_klass(skip_on_error: nil, middlewares: nil, batch_middlewares: nil)
klass = Class.new(self) do
def self.name
superclass.name
end
end
klass.skip_on_error = skip_on_error if skip_on_error
klass.middlewares = middlewares.map(&:constantize) if middlewares
klass.batch_middlewares = batch_middlewares.map(&:constantize) if batch_middlewares
klass
end
|
Instance Method Details
#consume ⇒ Object
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/sbmt/kafka_consumer/base_consumer.rb', line 24
def consume
::Rails.application.executor.wrap do
if process_batch?
with_batch_instrumentation(messages) do
do_process_batch(messages)
mark_message(messages.last)
end
else
messages.each do |message|
with_instrumentation(message) { do_consume(message) }
end
end
end
end
|
#process_batch? ⇒ Boolean
39
40
41
42
43
44
|
# File 'lib/sbmt/kafka_consumer/base_consumer.rb', line 39
def process_batch?
if @process_batch_memoized.nil?
@process_batch_memoized = respond_to?(:process_batch)
end
@process_batch_memoized
end
|