Class: Karafka::Processing::ConsumerGroups::Jobs::Consume

Inherits:
Jobs::Base
  • Object
show all
Defined in:
lib/karafka/processing/consumer_groups/jobs/consume.rb

Overview

The main job type. It runs the executor that triggers given topic partition messages processing in an underlying consumer instance.

Instance Attribute Summary collapse

Attributes inherited from Jobs::Base

#executor

Instance Method Summary collapse

Methods inherited from Jobs::Base

#finish!, #finished?, #non_blocking?, #wrap

Constructor Details

#initialize(executor, messages) ⇒ Consume

Parameters:



22
23
24
25
26
# File 'lib/karafka/processing/consumer_groups/jobs/consume.rb', line 22

def initialize(executor, messages)
  @executor = executor
  @messages = messages
  super()
end

Instance Attribute Details

#messagesArray<Rdkafka::Consumer::Message> (readonly)

Returns array with messages.

Returns:

  • (Array<Rdkafka::Consumer::Message>)

    array with messages



14
15
16
# File 'lib/karafka/processing/consumer_groups/jobs/consume.rb', line 14

def messages
  @messages
end

Instance Method Details

#after_callObject

Runs any error handling and other post-consumption stuff on the executor



45
46
47
# File 'lib/karafka/processing/consumer_groups/jobs/consume.rb', line 45

def after_call
  executor.after_consume
end

#before_callObject

Runs the before consumption preparations on the executor



35
36
37
# File 'lib/karafka/processing/consumer_groups/jobs/consume.rb', line 35

def before_call
  executor.before_consume
end

#before_scheduleObject

Runs all the preparation code on the executor that needs to happen before the job is scheduled.



30
31
32
# File 'lib/karafka/processing/consumer_groups/jobs/consume.rb', line 30

def before_schedule
  executor.before_schedule_consume(@messages)
end

#callObject

Runs the given executor



40
41
42
# File 'lib/karafka/processing/consumer_groups/jobs/consume.rb', line 40

def call
  executor.consume
end