Class: Karafka::Pro::ActiveJob::Consumer
- Inherits:
-
ActiveJob::Consumer
- Object
- BaseConsumer
- ActiveJob::Consumer
- Karafka::Pro::ActiveJob::Consumer
- Defined in:
- lib/karafka/pro/active_job/consumer.rb
Overview
Pro ActiveJob consumer that is suppose to handle long-running jobs as well as short running jobs
When in LRJ, it will pause a given partition forever and will resume its processing only when all the jobs are done processing.
It contains slightly better revocation warranties than the regular blocking consumer as it can stop processing batch of jobs in the middle after the revocation.
Instance Attribute Summary
Attributes inherited from BaseConsumer
#client, #coordinator, #id, #messages, #producer
Instance Method Summary collapse
-
#consume ⇒ Object
Runs ActiveJob jobs processing and handles lrj if needed.
Methods inherited from BaseConsumer
#initialize, #on_after_consume, #on_before_consume, #on_before_schedule_consume, #on_before_schedule_idle, #on_before_schedule_revoked, #on_before_schedule_shutdown, #on_consume, #on_idle, #on_revoked, #on_shutdown
Constructor Details
This class inherits a constructor from Karafka::BaseConsumer
Instance Method Details
#consume ⇒ Object
Runs ActiveJob jobs processing and handles lrj if needed
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/karafka/pro/active_job/consumer.rb', line 27 def consume .each(clean: true) do || # If for any reason we've lost this partition, not worth iterating over new messages # as they are no longer ours break if revoked? # We cannot early stop when running virtual partitions because the intermediate state # would force us not to commit the offsets. This would cause extensive # double-processing break if Karafka::App.stopping? && !topic.virtual_partitions? consume_job() # We can always mark because of the virtual offset management that we have in VPs mark_as_consumed() end end |