Class: ActiveJob::QueueAdapters::ShoryukenAdapter
- Inherits:
-
Object
- Object
- ActiveJob::QueueAdapters::ShoryukenAdapter
- Defined in:
- lib/active_job/queue_adapters/shoryuken_adapter.rb
Overview
Shoryuken queue adapter for ActiveJob integration. Provides methods for enqueueing jobs to SQS queues.
Direct Known Subclasses
Class Method Summary collapse
-
.enqueue(job) ⇒ Aws::SQS::Types::SendMessageResult
Enqueues a job for immediate processing.
-
.enqueue_at(job, timestamp) ⇒ Aws::SQS::Types::SendMessageResult
Enqueues a job for delayed processing.
-
.instance ⇒ ShoryukenAdapter
Returns the singleton adapter instance.
Instance Method Summary collapse
-
#enqueue(job, options = {}) ⇒ Aws::SQS::Types::SendMessageResult
Enqueues a job for immediate processing.
-
#enqueue_after_transaction_commit? ⇒ Boolean
Checks if jobs should be enqueued after transaction commit (Rails 7.2+).
-
#enqueue_all(jobs) ⇒ Integer
Bulk enqueue multiple jobs efficiently using SQS batch API.
-
#enqueue_at(job, timestamp) ⇒ Aws::SQS::Types::SendMessageResult
Enqueues a job for delayed processing.
-
#stopping? ⇒ Boolean
Indicates whether Shoryuken is in the process of shutting down.
Class Method Details
.enqueue(job) ⇒ Aws::SQS::Types::SendMessageResult
Enqueues a job for immediate processing
43 44 45 |
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 43 def enqueue(job) instance.enqueue(job) end |
.enqueue_at(job, timestamp) ⇒ Aws::SQS::Types::SendMessageResult
Enqueues a job for delayed processing
52 53 54 |
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 52 def enqueue_at(job, ) instance.enqueue_at(job, ) end |
.instance ⇒ ShoryukenAdapter
Returns the singleton adapter instance
34 35 36 37 |
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 34 def instance # https://github.com/ruby-shoryuken/shoryuken/pull/174#issuecomment-174555657 @instance ||= new end |
Instance Method Details
#enqueue(job, options = {}) ⇒ Aws::SQS::Types::SendMessageResult
Enqueues a job for immediate processing
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 85 def enqueue(job, = {}) # :nodoc: register_worker!(job) job..merge! queue = Shoryuken::Client.queues(job.queue_name) = queue, job job. = queue. end |
#enqueue_after_transaction_commit? ⇒ Boolean
Checks if jobs should be enqueued after transaction commit (Rails 7.2+)
60 61 62 |
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 60 def enqueue_after_transaction_commit? true end |
#enqueue_all(jobs) ⇒ Integer
Bulk enqueue multiple jobs efficiently using SQS batch API. Called by ActiveJob.perform_all_later (Rails 7.1+).
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 127 def enqueue_all(jobs) # :nodoc: jobs.group_by(&:queue_name).each do |queue_name, queue_jobs| queue = Shoryuken::Client.queues(queue_name) queue_jobs.each_slice(10) do |batch| entries = batch.map.with_index do |job, idx| register_worker!(job) msg = (queue, job) job. = msg { id: idx.to_s }.merge(msg) end response = queue.(entries: entries) successful_ids = response.successful.map { |r| r.id.to_i }.to_set batch.each_with_index do |job, idx| job.successfully_enqueued = successful_ids.include?(idx) end end end jobs.count(&:successfully_enqueued?) end |
#enqueue_at(job, timestamp) ⇒ Aws::SQS::Types::SendMessageResult
Enqueues a job for delayed processing
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 102 def enqueue_at(job, ) # :nodoc: delay = calculate_delay() # FIFO queues do not support per-message delays # Check early to fail synchronously (before any async wrapping in subclasses) # Note: negative delays (past timestamps) don't need handling here - # SQS treats them as immediate delivery (delay_seconds=0) # See https://github.com/ruby-shoryuken/shoryuken/issues/924 if delay.positive? queue = Shoryuken::Client.queues(job.queue_name) if queue.fifo? raise Shoryuken::Errors::FifoDelayNotSupportedError, "FIFO queue '#{queue.name}' does not support per-message delays. " \ 'When using ActiveJob retry_on with FIFO queues, set `wait: 0`.' end end enqueue(job, delay_seconds: delay) end |
#stopping? ⇒ Boolean
Indicates whether Shoryuken is in the process of shutting down.
This method is required for ActiveJob Continuations support (Rails 8.1+). When true, it signals to jobs that they should checkpoint their progress and gracefully interrupt execution to allow for resumption after restart.
72 73 74 75 |
# File 'lib/active_job/queue_adapters/shoryuken_adapter.rb', line 72 def stopping? launcher = Shoryuken::Runner.instance.launcher launcher&.stopping? || false end |