Class: Karafka::Pro::ActiveJob::Consumer

Inherits:
ActiveJob::Consumer show all
Defined in:
lib/karafka/pro/active_job/consumer.rb

Overview

Pro ActiveJob consumer that is suppose to handle long-running jobs as well as short running jobs

When in LRJ, it will pause a given partition forever and will resume its processing only when all the jobs are done processing.

It contains slightly better revocation warranties than the regular blocking consumer as it can stop processing batch of jobs in the middle after the revocation.

Instance Attribute Summary

Attributes inherited from BaseConsumer

#client, #coordinator, #id, #messages, #producer

Instance Method Summary collapse

Methods inherited from BaseConsumer

#initialize, #on_after_consume, #on_before_consume, #on_before_schedule_consume, #on_before_schedule_idle, #on_before_schedule_revoked, #on_before_schedule_shutdown, #on_consume, #on_idle, #on_revoked, #on_shutdown

Constructor Details

This class inherits a constructor from Karafka::BaseConsumer

Instance Method Details

#consumeObject

Runs ActiveJob jobs processing and handles lrj if needed



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/karafka/pro/active_job/consumer.rb', line 27

def consume
  messages.each(clean: true) do |message|
    # If for any reason we've lost this partition, not worth iterating over new messages
    # as they are no longer ours
    break if revoked?

    # We cannot early stop when running virtual partitions because the intermediate state
    # would force us not to commit the offsets. This would cause extensive
    # double-processing
    break if Karafka::App.stopping? && !topic.virtual_partitions?

    consume_job(message)

    # We can always mark because of the virtual offset management that we have in VPs
    mark_as_consumed(message)
  end
end