Class: Shoryuken::Middleware::Server::NonRetryableException
- Inherits:
-
Object
- Object
- Shoryuken::Middleware::Server::NonRetryableException
- Includes:
- Util
- Defined in:
- lib/shoryuken/middleware/server/non_retryable_exception.rb
Overview
Middleware that handles non-retryable exceptions by deleting messages immediately. When a configured exception occurs, the message is deleted instead of being retried.
Configure non-retryable exceptions per worker:
class MyWorker
include Shoryuken::Worker
# Using exception classes
queue: 'my_queue',
non_retryable_exceptions: [InvalidInputError, RecordNotFoundError]
# Or using a lambda for dynamic classification
queue: 'my_queue',
non_retryable_exceptions: ->(error) {
error.is_a?(StandardError) && error..include?('permanent')
}
def perform(sqs_msg, body)
# ...
end
end
Instance Method Summary collapse
-
#call(worker, queue, sqs_msg, _body) { ... } ⇒ void
Processes a message and handles non-retryable exceptions.
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Instance Method Details
#call(worker, queue, sqs_msg, _body) { ... } ⇒ void
This method returns an undefined value.
Processes a message and handles non-retryable exceptions
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/shoryuken/middleware/server/non_retryable_exception.rb', line 39 def call(worker, queue, sqs_msg, _body) yield rescue => e non_retryable_exceptions = worker.class.['non_retryable_exceptions'] return raise unless non_retryable_exceptions if non_retryable_exceptions.respond_to?(:call) return raise unless non_retryable_exceptions.call(e) else exception_classes = Array(non_retryable_exceptions) return raise unless exception_classes.any? { |klass| e.is_a?(klass) } end # Handle batch messages = sqs_msg.is_a?(Array) ? sqs_msg : [sqs_msg] logger.warn do "Non-retryable exception #{e.class} occurred for message(s) #{.map(&:message_id).join(', ')}. " \ "Deleting message(s) immediately. Error: #{e.}" end logger.debug { e.backtrace.join("\n") } if e.backtrace # Delete the message(s) immediately entries = .map.with_index { |, i| { id: i.to_s, receipt_handle: .receipt_handle } } begin queue_client = Shoryuken::Client.queues(queue) delete_failed = queue_client.(entries: entries) # Check if deletion reported failures (returns true if any failed) if delete_failed logger.warn do 'Failed to delete some messages for non-retryable exception on queue ' \ "'#{queue}'. " \ "Entries: #{entries.map { |e| { id: e[:id] } }.inspect}. " \ 'Some messages may remain in the queue and could be reprocessed.' end end rescue => delete_error logger.error do 'Error deleting messages for non-retryable exception on queue ' \ "'#{queue}': #{delete_error.class} - #{delete_error.}. " \ "Entries: #{entries.map { |e| { id: e[:id] } }.inspect}. " \ 'Messages may remain in the queue and could be reprocessed.' end logger.debug { delete_error.backtrace.join("\n") } if delete_error.backtrace end # Don't re-raise - the exception has been handled by deleting the message end |