Module: Karafka::Pro::Processing::Strategies::Aj::DlqMom
- Includes:
- Dlq::Mom
- Included in:
- DlqFtrLrjMom
- Defined in:
- lib/karafka/pro/processing/strategies/aj/dlq_mom.rb
Overview
ActiveJob enabled DLQ enabled Manual offset management enabled
AJ has manual offset management on by default and the offset management is delegated to the AJ consumer. This means, we cannot mark as consumed always. We can only mark as consumed when we skip given job upon errors. In all the other scenarios marking as consumed needs to happen in the AJ consumer on a per job basis.
Constant Summary collapse
- FEATURES =
Features for this strategy
%i[ active_job dead_letter_queue manual_offset_management ].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
How should we post-finalize consumption.
Methods included from Dlq::Default
#apply_dlq_flow, #build_dlq_message, #dispatch_if_needed_and_mark_as_consumed, #dispatch_in_a_transaction?, #dispatch_to_dlq, #dispatch_to_dlq?, #find_skippable_message, #mark_as_consumed, #mark_as_consumed!, #mark_dispatched_to_dlq
Methods included from Default
#handle_before_consume, #handle_before_schedule_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_as_consumed, #mark_as_consumed!, #mark_in_transaction, #store_offset_metadata, #transaction
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown, #mark_as_consumed, #mark_as_consumed!
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
How should we post-finalize consumption.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/karafka/pro/processing/strategies/aj/dlq_mom.rb', line 38 def handle_after_consume coordinator.on_finished do return if revoked? if coordinator.success? # Do NOT commit offsets, they are comitted after each job in the AJ consumer. coordinator.pause_tracker.reset else apply_dlq_flow do , = dispatch_to_dlq() if dispatch_to_dlq? # We can commit the offset here because we know that we skip it "forever" and # since AJ consumer commits the offset after each job, we also know that the # previous job was successful mark_dispatched_to_dlq() end end end end |