Class: ActiveJob::QueueAdapters::ShoryukenAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/active_job/queue_adapters/shoryuken_adapter.rb

Overview

Shoryuken queue adapter for ActiveJob integration. Provides methods for enqueueing jobs to SQS queues.

Direct Known Subclasses

ShoryukenConcurrentSendAdapter

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.enqueue(job) ⇒ Aws::SQS::Types::SendMessageResult

Enqueues a job for immediate processing

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

Returns:

  • (Aws::SQS::Types::SendMessageResult)

    the send result



43
44
45
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 43

def enqueue(job)
  instance.enqueue(job)
end

.enqueue_at(job, timestamp) ⇒ Aws::SQS::Types::SendMessageResult

Enqueues a job for delayed processing

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

  • timestamp (Float)

    Unix timestamp when the job should be processed

Returns:

  • (Aws::SQS::Types::SendMessageResult)

    the send result



52
53
54
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 52

def enqueue_at(job, timestamp)
  instance.enqueue_at(job, timestamp)
end

.instanceShoryukenAdapter

Returns the singleton adapter instance

Returns:



34
35
36
37
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 34

def instance
  # https://github.com/ruby-shoryuken/shoryuken/pull/174#issuecomment-174555657
  @instance ||= new
end

Instance Method Details

#enqueue(job, options = {}) ⇒ Aws::SQS::Types::SendMessageResult

Enqueues a job for immediate processing

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

  • options (Hash) (defaults to: {})

    SQS message configuration

Options Hash (options):

  • :delay_seconds (Integer)

    delay before the message becomes visible

  • :message_group_id (String)

    FIFO queue group ID

  • :message_deduplication_id (String)

    FIFO queue deduplication ID

Returns:

  • (Aws::SQS::Types::SendMessageResult)

    the send result



85
86
87
88
89
90
91
92
93
94
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 85

def enqueue(job, options = {}) # :nodoc:
  register_worker!(job)

  job.sqs_send_message_parameters.merge! options

  queue = Shoryuken::Client.queues(job.queue_name)
  send_message_params = message queue, job
  job.sqs_send_message_parameters = send_message_params
  queue.send_message send_message_params
end

#enqueue_after_transaction_commit?Boolean

Checks if jobs should be enqueued after transaction commit (Rails 7.2+)

Returns:

  • (Boolean)

    always returns true



60
61
62
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 60

def enqueue_after_transaction_commit?
  true
end

#enqueue_all(jobs) ⇒ Integer

Bulk enqueue multiple jobs efficiently using SQS batch API. Called by ActiveJob.perform_all_later (Rails 7.1+).

Parameters:

  • jobs (Array<ActiveJob::Base>)

    array of ActiveJob instances to be enqueued

Returns:

  • (Integer)

    number of jobs successfully enqueued



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 127

def enqueue_all(jobs) # :nodoc:
  jobs.group_by(&:queue_name).each do |queue_name, queue_jobs|
    queue = Shoryuken::Client.queues(queue_name)

    queue_jobs.each_slice(10) do |batch|
      entries = batch.map.with_index do |job, idx|
        register_worker!(job)
        msg = message(queue, job)
        job.sqs_send_message_parameters = msg
        { id: idx.to_s }.merge(msg)
      end

      response = queue.send_messages(entries: entries)
      successful_ids = response.successful.map { |r| r.id.to_i }.to_set
      batch.each_with_index do |job, idx|
        job.successfully_enqueued = successful_ids.include?(idx)
      end
    end
  end

  jobs.count(&:successfully_enqueued?)
end

#enqueue_at(job, timestamp) ⇒ Aws::SQS::Types::SendMessageResult

Enqueues a job for delayed processing

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

  • timestamp (Float)

    Unix timestamp when the job should be processed

Returns:

  • (Aws::SQS::Types::SendMessageResult)

    the send result

Raises:



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 102

def enqueue_at(job, timestamp) # :nodoc:
  delay = calculate_delay(timestamp)

  # FIFO queues do not support per-message delays
  # Check early to fail synchronously (before any async wrapping in subclasses)
  # Note: negative delays (past timestamps) don't need handling here -
  # SQS treats them as immediate delivery (delay_seconds=0)
  # See https://github.com/ruby-shoryuken/shoryuken/issues/924
  if delay.positive?
    queue = Shoryuken::Client.queues(job.queue_name)
    if queue.fifo?
      raise Shoryuken::Errors::FifoDelayNotSupportedError,
            "FIFO queue '#{queue.name}' does not support per-message delays. " \
            'When using ActiveJob retry_on with FIFO queues, set `wait: 0`.'
    end
  end

  enqueue(job, delay_seconds: delay)
end

#stopping?Boolean

Indicates whether Shoryuken is in the process of shutting down.

This method is required for ActiveJob Continuations support (Rails 8.1+). When true, it signals to jobs that they should checkpoint their progress and gracefully interrupt execution to allow for resumption after restart.

Returns:

  • (Boolean)

    true if Shoryuken is shutting down, false otherwise

See Also:



72
73
74
75
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 72

def stopping?
  launcher = Shoryuken::Runner.instance.launcher
  launcher&.stopping? || false
end