Class: Shoryuken::Worker::DefaultExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/shoryuken/worker/default_executor.rb

Overview

Default executor that sends messages to SQS for asynchronous processing. This is the standard executor used in production environments.

Class Method Summary collapse

Class Method Details

.perform_async(worker_class, body, options = {}) ⇒ Aws::SQS::Types::SendMessageResult

Enqueues a job for asynchronous processing via SQS

Parameters:

  • worker_class (Class)

    the worker class that will process the message

  • body (Object)

    the message body

  • options (Hash) (defaults to: {})

    additional SQS message options

Options Hash (options):

  • :message_attributes (Hash)

    custom message attributes

  • :queue (String)

    override the default queue

Returns:

  • (Aws::SQS::Types::SendMessageResult)

    the send result



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/shoryuken/worker/default_executor.rb', line 17

def perform_async(worker_class, body, options = {})
  options[:message_attributes] ||= {}
  options[:message_attributes]['shoryuken_class'] = {
    string_value: worker_class.to_s,
    data_type: 'String'
  }

  options[:message_body] = body

  queue = options.delete(:queue) || worker_class.get_shoryuken_options['queue']

  Shoryuken::Client.queues(queue).send_message(options)
end

.perform_in(worker_class, interval, body, options = {}) ⇒ Aws::SQS::Types::SendMessageResult

Enqueues a job for delayed processing via SQS

Parameters:

  • worker_class (Class)

    the worker class that will process the message

  • interval (Integer, Float)

    delay in seconds or timestamp

  • body (Object)

    the message body

  • options (Hash) (defaults to: {})

    SQS message options for the delayed job

Options Hash (options):

  • :message_attributes (Hash)

    custom message attributes

  • :queue (String)

    override the default queue

Returns:

  • (Aws::SQS::Types::SendMessageResult)

    the send result

Raises:



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/shoryuken/worker/default_executor.rb', line 41

def perform_in(worker_class, interval, body, options = {})
  interval = interval.to_f
  now = Time.now.to_f
  ts = (interval < 1_000_000_000 ? (now + interval).to_f : interval)

  delay = (ts - now).ceil

  raise Errors::InvalidDelayError, 'The maximum allowed delay is 15 minutes' if delay > 15 * 60

  worker_class.perform_async(body, options.merge(delay_seconds: delay))
end