Module: Shoryuken::Worker::ClassMethods

Defined in:
lib/shoryuken/worker.rb

Overview

Class methods added to classes that include Shoryuken::Worker. Provides methods for configuring the worker, enqueueing jobs, and managing middleware.

Instance Method Summary collapse

Instance Method Details

#auto_delete?Boolean

Checks if automatic message deletion is enabled for this worker. When enabled, successfully processed messages are automatically deleted from the SQS queue. When disabled, you must manually delete messages or they will become visible again after the visibility timeout.

Examples:

Manual message deletion when auto_delete is false

def perform(sqs_msg, body)
  process_message(body)
  # Manually delete the message after successful processing
  sqs_msg.delete
end

Returns:

  • (Boolean)

    true if auto delete is enabled

See Also:



249
250
251
# File 'lib/shoryuken/worker.rb', line 249

def auto_delete?
  !!(get_shoryuken_options['delete'] || get_shoryuken_options['auto_delete'])
end

#auto_visibility_timeout?Boolean

Checks if automatic visibility timeout extension is enabled for this worker. When enabled, Shoryuken automatically extends the message visibility timeout during processing to prevent the message from becoming visible to other consumers.

Returns:

  • (Boolean)

    true if auto visibility timeout is enabled

See Also:



215
216
217
# File 'lib/shoryuken/worker.rb', line 215

def auto_visibility_timeout?
  !!get_shoryuken_options['auto_visibility_timeout']
end

#exponential_backoff?Boolean

Checks if exponential backoff retry is configured for this worker. When retry intervals are specified, failed jobs will be retried with increasing delays between attempts.

Examples:

Configuring exponential backoff

shoryuken_options retry_intervals: [1, 5, 25, 125, 625]
# Will retry after 1s, 5s, 25s, 125s, then 625s before giving up

Returns:

  • (Boolean)

    true if retry intervals are configured

See Also:



230
231
232
# File 'lib/shoryuken/worker.rb', line 230

def exponential_backoff?
  !!get_shoryuken_options['retry_intervals']
end

#get_shoryuken_optionsHash

Returns the shoryuken options for this worker class

Returns:

  • (Hash)

    the options hash



255
256
257
# File 'lib/shoryuken/worker.rb', line 255

def get_shoryuken_options # :nodoc:
  shoryuken_options_hash || Shoryuken.default_worker_options
end

#perform_async(body, options = {}) ⇒ String

Enqueues a job to be processed asynchronously by a Shoryuken worker.

Examples:

Basic job enqueueing

MyWorker.perform_async({ user_id: 123, action: 'send_email' })

FIFO queue with ordering

MyWorker.perform_async(data, message_group_id: 'user_123')

Parameters:

  • body (Object)

    The job payload that will be passed to the worker’s perform method

  • options (Hash) (defaults to: {})

    Additional options for job enqueueing

Options Hash (options):

  • :message_group_id (String)

    FIFO queue group ID for message ordering

  • :message_deduplication_id (String)

    FIFO queue deduplication ID

  • :message_attributes (Hash)

    Custom SQS message attributes

Returns:

  • (String)

    The message ID of the enqueued job



71
72
73
# File 'lib/shoryuken/worker.rb', line 71

def perform_async(body, options = {})
  Shoryuken.worker_executor.perform_async(self, body, options)
end

#perform_in(interval, body, options = {}) ⇒ String Also known as: perform_at

Enqueues a job to be processed after a specified time interval.

Examples:

Delay job by 5 minutes

MyWorker.perform_in(5.minutes, { user_id: 123 })

Delay job by specific number of seconds

MyWorker.perform_in(300, { user_id: 123 })

Parameters:

  • interval (Integer, ActiveSupport::Duration)

    Delay in seconds, or duration object

  • body (Object)

    The job payload that will be passed to the worker’s perform method

  • options (Hash) (defaults to: {})

    SQS message options for the delayed job

Options Hash (options):

  • :message_group_id (String)

    FIFO queue group ID for message ordering

  • :message_deduplication_id (String)

    FIFO queue deduplication ID

  • :message_attributes (Hash)

    Custom SQS message attributes

Returns:

  • (String)

    The message ID of the enqueued job



90
91
92
# File 'lib/shoryuken/worker.rb', line 90

def perform_in(interval, body, options = {})
  Shoryuken.worker_executor.perform_in(self, interval, body, options)
end

#server_middleware {|Shoryuken::Middleware::Chain| ... } ⇒ Shoryuken::Middleware::Chain

Configures server-side middleware chain for this worker class. Middleware runs before and after job processing, similar to Rack middleware.

Examples:

Adding custom middleware

class MyWorker
  include Shoryuken::Worker

  server_middleware do |chain|
    chain.add MyCustomMiddleware
    chain.remove Shoryuken::Middleware::Server::ActiveRecord
  end
end

Yields:

Returns:



111
112
113
114
115
# File 'lib/shoryuken/worker.rb', line 111

def server_middleware
  @_server_chain ||= Shoryuken.server_middleware.dup
  yield @_server_chain if block_given?
  @_server_chain
end

#shoryuken_class_attribute(*attrs) ⇒ void

This method returns an undefined value.

Defines inheritable class attributes for workers

Parameters:

  • attrs (Array<Symbol>)

    attribute names to define



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/shoryuken/worker.rb', line 271

def shoryuken_class_attribute(*attrs) # :nodoc:
  attrs.each do |name|
    singleton_class.instance_eval do
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
    end
    define_singleton_method(name) { nil }

    ivar = "@#{name}"

    singleton_class.instance_eval do
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
    end

    define_singleton_method("#{name}=") do |val|
      singleton_class.class_eval do
        undef_method(name) if method_defined?(name) || private_method_defined?(name)
        define_method(name) { val }
      end

      # singleton? backwards compatibility for ruby < 2.1
      singleton_klass = respond_to?(:singleton?) ? singleton? : self != ancestors.first

      if singleton_klass
        class_eval do
          undef_method(name) if method_defined?(name) || private_method_defined?(name)
          define_method(name) do
            if instance_variable_defined? ivar
              instance_variable_get ivar
            else
              singleton_class.send name
            end
          end
        end
      end
      val
    end

    # instance reader
    undef_method(name) if method_defined?(name) || private_method_defined?(name)
    define_method(name) do
      if instance_variable_defined?(ivar)
        instance_variable_get ivar
      else
        self.class.public_send name
      end
    end

    # instance writer
    m = "#{name}="
    undef_method(m) if method_defined?(m) || private_method_defined?(m)
    attr_writer name
  end
end

#shoryuken_options(opts = {}) ⇒ Object

Configures worker options including queue assignment, processing behavior, and SQS-specific settings. This is the main configuration method for workers.

Examples:

Basic worker configuration

class MyWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'my_queue'

  def perform(sqs_msg, body)
    # Process the message
  end
end

Worker with auto-delete and retries

class ReliableWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'important_queue',
                    auto_delete: true,
                    retry_intervals: [1, 5, 25, 125]
end

Batch processing worker

class BatchWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'batch_queue', batch: true

  def perform(sqs_msgs, bodies)
    # Process array of up to 10 messages
    bodies.each { |body| process_item(body) }
  end
end

Multiple queues with priorities

class MultiQueueWorker
  include Shoryuken::Worker
  shoryuken_options queue: ['high_priority', 'low_priority']
end

Auto-extending visibility timeout for long-running jobs

class LongRunningWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'slow_queue',
                    auto_visibility_timeout: true

  def perform(sqs_msg, body)
    # Long processing that might exceed visibility timeout
    complex_processing(body)
  end
end

Worker with non-retryable exceptions

class ValidationWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'validation_queue',
                    non_retryable_exceptions: [InvalidInputError, RecordNotFoundError]

  def perform(sqs_msg, body)
    # If InvalidInputError or RecordNotFoundError is raised,
    # the message will be deleted immediately instead of retrying
    validate_and_process(body)
  end
end

Worker with lambda for dynamic exception classification

class SmartWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'smart_queue',
                    non_retryable_exceptions: ->(error) {
                      error.is_a?(ValidationError) || 
                      (error.is_a?(NetworkError) && error.message.include?('permanent'))
                    }

  def perform(sqs_msg, body)
    # Lambda receives the exception and returns true if non-retryable
    process_with_validation(body)
  end
end

Parameters:

  • opts (Hash) (defaults to: {})

    Configuration options for the worker

Options Hash (opts):

  • :queue (String, Array<String>)

    Queue name(s) this worker processes

  • :batch (Boolean) — default: false

    Process messages in batches of up to 10

  • :auto_delete (Boolean) — default: false

    Automatically delete messages after processing

  • :auto_visibility_timeout (Boolean) — default: false

    Automatically extend message visibility

  • :retry_intervals (Array<Integer>)

    Exponential backoff retry intervals in seconds

  • :non_retryable_exceptions (Array<Class>, Proc)

    Exception classes or lambda that should skip retries and delete message immediately

  • :sqs (Hash)

    Additional SQS client options



203
204
205
206
# File 'lib/shoryuken/worker.rb', line 203

def shoryuken_options(opts = {})
  self.shoryuken_options_hash = get_shoryuken_options.merge(stringify_keys(opts || {}))
  normalize_worker_queue!
end

#stringify_keys(hash) ⇒ Hash

Converts hash keys to strings

Parameters:

  • hash (Hash)

    the hash to convert

Returns:

  • (Hash)

    hash with string keys



262
263
264
265
266
# File 'lib/shoryuken/worker.rb', line 262

def stringify_keys(hash) # :nodoc:
  new_hash = {}
  hash.each { |key, value| new_hash[key.to_s] = value }
  new_hash
end