Class: ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter

Inherits:
ShoryukenAdapter
  • Object
show all
Defined in:
lib/active_job/queue_adapters/shoryuken_concurrent_send_adapter.rb

Overview

Shoryuken concurrent adapter for Active Job.

This adapter sends messages asynchronously (ie non-blocking) and allows the caller to set up handlers for both success and failure.

Examples:

Setting up the adapter

success_handler = ->(response, job, options) { StatsD.increment("#{job.class.name}.success") }
error_handler = ->(err, job, options) { StatsD.increment("#{job.class.name}.failure") }

adapter = ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter.new(success_handler, error_handler)

config.active_job.queue_adapter = adapter

Instance Method Summary collapse

Methods inherited from ShoryukenAdapter

enqueue, #enqueue_after_transaction_commit?, #enqueue_all, enqueue_at, #enqueue_at, instance, #stopping?

Constructor Details

#initialize(success_handler = nil, error_handler = nil) ⇒ ShoryukenConcurrentSendAdapter

Initializes a new concurrent send adapter

Parameters:

  • success_handler (Proc, nil) (defaults to: nil)

    callback for successful enqueues

  • error_handler (Proc, nil) (defaults to: nil)

    callback for failed enqueues



26
27
28
29
30
# File 'lib/active_job/queue_adapters/shoryuken_concurrent_send_adapter.rb', line 26

def initialize(success_handler = nil, error_handler = nil)
  super() if defined?(super)
  @success_handler = success_handler
  @error_handler = error_handler
end

Instance Method Details

#enqueue(job, options = {}) ⇒ Concurrent::Promises::Future

Enqueues a job asynchronously

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

  • options (Hash) (defaults to: {})

    SQS message configuration

Options Hash (options):

  • :delay_seconds (Integer)

    delay before the message becomes visible

  • :message_group_id (String)

    FIFO queue group ID

  • :message_deduplication_id (String)

    FIFO queue deduplication ID

Returns:

  • (Concurrent::Promises::Future)

    the future representing the async operation



40
41
42
# File 'lib/active_job/queue_adapters/shoryuken_concurrent_send_adapter.rb', line 40

def enqueue(job, options = {})
  send_concurrently(job, options) { |f_job, f_options| super(f_job, f_options) }
end

#error_handlerProc

Returns the error handler, using a default logger if not set

Returns:

  • (Proc)

    the error handler



54
55
56
57
58
# File 'lib/active_job/queue_adapters/shoryuken_concurrent_send_adapter.rb', line 54

def error_handler
  @error_handler ||= lambda { |error, job, _options|
    Shoryuken.logger.warn("Failed to enqueue job: #{job.inspect} due to error: #{error}")
  }
end

#success_handlerProc

Returns the success handler, using a default no-op if not set

Returns:

  • (Proc)

    the success handler



47
48
49
# File 'lib/active_job/queue_adapters/shoryuken_concurrent_send_adapter.rb', line 47

def success_handler
  @success_handler ||= ->(_send_message_response, _job, _options) { nil }
end