Module: Karafka::Pro::Processing::Strategies::Dlq::Default
- Defined in:
- lib/karafka/pro/processing/strategies/dlq/default.rb
Overview
Only dead letter queue enabled
Constant Summary collapse
- FEATURES =
Features for this strategy
%i[ dead_letter_queue ].freeze
Instance Method Summary collapse
-
#apply_dlq_flow ⇒ Object
Runs the DLQ strategy and based on it it performs certain operations.
-
#build_dlq_message(skippable_message) ⇒ Hash
Dispatch DLQ message.
-
#dispatch_if_needed_and_mark_as_consumed ⇒ Object
Dispatches the message to the DLQ (when needed and when applicable based on settings) and marks this message as consumed for non MOM flows.
-
#dispatch_in_a_transaction? ⇒ Boolean
Should we use a transaction to move the data to the DLQ.
-
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings.
-
#dispatch_to_dlq? ⇒ Boolean
Should we dispatch the message to DLQ or not.
-
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first).
-
#handle_after_consume ⇒ Object
When we encounter non-recoverable message, we skip it and go on with our lives.
-
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Object
Override of the standard ‘#mark_as_consumed` in order to handle the pause tracker reset in case DLQ is marked as fully independent.
-
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Object
Override of the standard ‘#mark_as_consumed!`.
-
#mark_dispatched_to_dlq(skippable_message) ⇒ Object
Marks message that went to DLQ (if applicable) based on the requested method.
Methods included from Karafka::Pro::Processing::Strategies::Default
#handle_before_consume, #handle_before_schedule_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_in_transaction, #store_offset_metadata, #transaction
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Instance Method Details
#apply_dlq_flow ⇒ Object
Runs the DLQ strategy and based on it it performs certain operations
In case of ‘:skip` and `:dispatch` will run the exact flow provided in a block In case of `:retry` always `#retry_after_pause` is applied
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 200 def apply_dlq_flow flow = topic.dead_letter_queue.strategy.call(errors_tracker, attempt) case flow when :retry retry_after_pause return when :skip @_dispatch_to_dlq = false when :dispatch @_dispatch_to_dlq = true else raise Karafka::UnsupportedCaseError, flow end yield # We reset the pause to indicate we will now consider it as "ok". coordinator.pause_tracker.reset # Always backoff after DLQ dispatch even on skip to prevent overloads on errors pause(coordinator.seek_offset, nil, false) end |
#build_dlq_message(skippable_message) ⇒ Hash
Returns dispatch DLQ message.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 150 def () original_partition = .partition.to_s = { topic: topic.dead_letter_queue.topic, key: original_partition, payload: .raw_payload, headers: .raw_headers.merge( 'original_topic' => topic.name, 'original_partition' => original_partition, 'original_offset' => .offset.to_s, 'original_consumer_group' => topic.consumer_group.id, 'original_key' => .raw_key.to_s, 'original_attempts' => attempt.to_s ) } # Optional method user can define in consumer to enhance the dlq message hash with # some extra details if needed or to replace payload, etc if respond_to?(:enhance_dlq_message, true) ( , ) end end |
#dispatch_if_needed_and_mark_as_consumed ⇒ Object
Dispatches the message to the DLQ (when needed and when applicable based on settings)
and marks this message as consumed for non MOM flows.
If producer is transactional and config allows, uses transaction to do that
133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 133 def dispatch_if_needed_and_mark_as_consumed , = dispatch = lambda do dispatch_to_dlq() if dispatch_to_dlq? mark_dispatched_to_dlq() end if dispatch_in_a_transaction? transaction { dispatch.call } else dispatch.call end end |
#dispatch_in_a_transaction? ⇒ Boolean
Returns should we use a transaction to move the data to the DLQ. We can do it only when producer is transactional and configuration for DLQ transactional dispatches is not set to false.
192 193 194 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 192 def dispatch_in_a_transaction? producer.transactional? && topic.dead_letter_queue.transactional? end |
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 107 def dispatch_to_dlq() # DLQ should never try to dispatch a message that was cleaned. It message was # cleaned, we will not have all the needed data. If you see this error, it means # that your processing flow is not as expected and you have cleaned message that # should not be cleaned as it should go to the DLQ raise(Cleaner::Errors::MessageCleanedError) if .cleaned? producer.public_send( topic.dead_letter_queue.dispatch_method, ( ) ) # Notify about dispatch on the events bus Karafka.monitor.instrument( 'dead_letter_queue.dispatched', caller: self, message: ) end |
#dispatch_to_dlq? ⇒ Boolean
Returns should we dispatch the message to DLQ or not. When the dispatch topic is set to false, we will skip the dispatch, effectively ignoring the broken message without taking any action.
182 183 184 185 186 187 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 182 def dispatch_to_dlq? return false unless topic.dead_letter_queue.topic return false unless @_dispatch_to_dlq true end |
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first)
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 91 def = .find do |msg| coordinator.marked? && msg.offset == coordinator.seek_offset end # If we don't have the message matching the last comitted offset, it means that # user operates with manual offsets and we're beyond the batch in which things # broke for the first time. Then we skip the first (as no markings) and we # move on one by one. ? [, true] : [.first, false] end |
#handle_after_consume ⇒ Object
When we encounter non-recoverable message, we skip it and go on with our lives
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 69 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset return if coordinator.manual_pause? mark_as_consumed() else apply_dlq_flow do dispatch_if_needed_and_mark_as_consumed end end end end |
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Object
Override of the standard ‘#mark_as_consumed` in order to handle the pause tracker reset in case DLQ is marked as fully independent. When DLQ is marked independent, any offset marking causes the pause count tracker to reset. This is useful when the error is not due to the collective batch operations state but due to intermediate “crawling” errors that move with it
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 38 def mark_as_consumed(, = @_current_offset_metadata) return super unless return super unless topic.dead_letter_queue.independent? return false unless super coordinator.pause_tracker.reset true ensure @_current_offset_metadata = nil end |
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Object
Override of the standard ‘#mark_as_consumed!`. Resets the pause tracker count in case DLQ was configured with the `independent` flag.
56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 56 def mark_as_consumed!(, = @_current_offset_metadata) return super unless return super unless topic.dead_letter_queue.independent? return false unless super coordinator.pause_tracker.reset true ensure @_current_offset_metadata = nil end |
#mark_dispatched_to_dlq(skippable_message) ⇒ Object
Marks message that went to DLQ (if applicable) based on the requested method
227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 227 def mark_dispatched_to_dlq() case topic.dead_letter_queue.marking_method when :mark_as_consumed mark_as_consumed() when :mark_as_consumed! mark_as_consumed!() else # This should never happen. Bug if encountered. Please report raise Karafka::Errors::UnsupportedCaseError end end |