Class: Sbmt::KafkaConsumer::BaseConsumer

Inherits:
Karafka::BaseConsumer
  • Object
show all
Defined in:
lib/sbmt/kafka_consumer/base_consumer.rb

Direct Known Subclasses

InboxConsumer, SimpleLoggingConsumer

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.consumer_klass(skip_on_error: nil, middlewares: nil, batch_middlewares: nil) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/sbmt/kafka_consumer/base_consumer.rb', line 10

def self.consumer_klass(skip_on_error: nil, middlewares: nil, batch_middlewares: nil)
  klass = Class.new(self) do
    def self.name
      superclass.name
    end
  end

  # defaults are set in class_attribute definition
  klass.skip_on_error = skip_on_error if skip_on_error
  klass.middlewares = middlewares.map(&:constantize) if middlewares
  klass.batch_middlewares = batch_middlewares.map(&:constantize) if batch_middlewares
  klass
end

Instance Method Details

#consumeObject



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/sbmt/kafka_consumer/base_consumer.rb', line 24

def consume
  ::Rails.application.executor.wrap do
    if process_batch?
      with_batch_instrumentation(messages) do
        do_process_batch(messages)
        mark_message(messages.last)
      end
    else
      messages.each do |message|
        with_instrumentation(message) { do_consume(message) }
      end
    end
  end
end

#process_batch?Boolean

Returns:

  • (Boolean)


39
40
41
42
43
44
# File 'lib/sbmt/kafka_consumer/base_consumer.rb', line 39

def process_batch?
  if @process_batch_memoized.nil?
    @process_batch_memoized = respond_to?(:process_batch)
  end
  @process_batch_memoized
end