Module: Karafka::Processing::Strategies::Dlq
Overview
When using dead letter queue, processing won’t stop after defined number of retries upon encountering non-critical errors but the messages that error will be moved to a separate topic with their payload and metadata, so they can be handled differently.
Constant Summary collapse
- FEATURES =
Apply strategy when only dead letter queue is turned on
%i[ dead_letter_queue ].freeze
Instance Method Summary collapse
-
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings.
-
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first).
-
#handle_after_consume ⇒ Object
When manual offset management is on, we do not mark anything as consumed automatically and we rely on the user to figure things out.
-
#mark_as_consumed(message) ⇒ Object
Override of the standard ‘#mark_as_consumed` in order to handle the pause tracker reset in case DLQ is marked as fully independent.
-
#mark_as_consumed!(message) ⇒ Object
Override of the standard ‘#mark_as_consumed!`.
-
#mark_dispatched_to_dlq(skippable_message) ⇒ Object
Marks message that went to DLQ (if applicable) based on the requested method.
Methods included from Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Methods included from Base
#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Instance Method Details
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings
108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 108 def dispatch_to_dlq() producer.public_send( topic.dead_letter_queue.dispatch_method, topic: topic.dead_letter_queue.topic, payload: .raw_payload ) # Notify about dispatch on the events bus Karafka.monitor.instrument( 'dead_letter_queue.dispatched', caller: self, message: ) end |
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first)
92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 92 def = .find do |msg| coordinator.marked? && msg.offset == coordinator.seek_offset end # If we don't have the message matching the last comitted offset, it means that # user operates with manual offsets and we're beyond the batch in which things # broke for the first time. Then we skip the first (as no markings) and we # move on one by one. ? [, true] : [.first, false] end |
#handle_after_consume ⇒ Object
When manual offset management is on, we do not mark anything as consumed automatically and we rely on the user to figure things out
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 55 def handle_after_consume return if revoked? if coordinator.success? coordinator.pause_tracker.reset return if coordinator.manual_pause? mark_as_consumed(.last) elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries retry_after_pause # If we've reached number of retries that we could, we need to skip the first message # that was not marked as consumed, pause and continue, while also moving this message # to the dead topic else # We reset the pause to indicate we will now consider it as "ok". coordinator.pause_tracker.reset , = # Send skippable message to the dql topic dispatch_to_dlq() # We mark the broken message as consumed and move on mark_dispatched_to_dlq() return if revoked? # We pause to backoff once just in case. pause(coordinator.seek_offset, nil, false) end end |
#mark_as_consumed(message) ⇒ Object
Override of the standard ‘#mark_as_consumed` in order to handle the pause tracker reset in case DLQ is marked as fully independent. When DLQ is marked independent, any offset marking causes the pause count tracker to reset. This is useful when the error is not due to the collective batch operations state but due to intermediate “crawling” errors that move with it
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 25 def mark_as_consumed() # If we are not retrying pause count is already 0, no need to try to reset the state return super unless # If we do not use independent marking on DLQ, we just mark as consumed return super unless topic.dead_letter_queue.independent? # If we were not able to mark no need to reset return false unless super coordinator.pause_tracker.reset true end |
#mark_as_consumed!(message) ⇒ Object
Override of the standard ‘#mark_as_consumed!`. Resets the pause tracker count in case DLQ was configured with the `independent` flag.
43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 43 def mark_as_consumed!() return super unless return super unless topic.dead_letter_queue.independent? return false unless super coordinator.pause_tracker.reset true end |
#mark_dispatched_to_dlq(skippable_message) ⇒ Object
Marks message that went to DLQ (if applicable) based on the requested method
125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 125 def mark_dispatched_to_dlq() case topic.dead_letter_queue.marking_method when :mark_as_consumed mark_as_consumed() when :mark_as_consumed! mark_as_consumed!() else # This should never happen. Bug if encountered. Please report raise Karafka::Errors::UnsupportedCaseError end end |