Class: GoodJob::Batch
- Inherits:
-
Object
- Object
- GoodJob::Batch
- Includes:
- GlobalID::Identification
- Defined in:
- app/models/good_job/batch.rb
Overview
NOTE: This class delegates to BatchRecord and is intended to be the public interface for Batches.
Constant Summary collapse
- PROTECTED_PROPERTIES =
%i[ on_finish on_success on_discard callback_queue_name callback_priority description properties ].freeze
Class Method Summary collapse
-
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it.
-
.enqueue_all(batch_job_pairs) ⇒ Array<GoodJob::Batch>
Bulk-enqueue multiple batches with their jobs in minimal DB round-trips.
- .find(id) ⇒ Object
- .primary_key ⇒ Object
-
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch.
Instance Method Summary collapse
- #_record ⇒ Object
- #active_jobs ⇒ Object
-
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch.
- #assign_properties(properties) ⇒ Object
- #callback_active_jobs ⇒ Object
-
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
Active jobs added to the batch.
-
#initialize(_record: nil, **properties) ⇒ Batch
constructor
rubocop:disable Lint/UnderscorePrefixedVariableName.
- #retry ⇒ Object
Constructor Details
#initialize(_record: nil, **properties) ⇒ Batch
rubocop:disable Lint/UnderscorePrefixedVariableName
214 215 216 217 |
# File 'app/models/good_job/batch.rb', line 214 def initialize(_record: nil, **properties) # rubocop:disable Lint/UnderscorePrefixedVariableName self.record = _record || BatchRecord.new assign_properties(properties) end |
Class Method Details
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it
58 59 60 61 62 |
# File 'app/models/good_job/batch.rb', line 58 def self.enqueue(active_jobs = [], **properties, &block) new.tap do |batch| batch.enqueue(active_jobs, **properties, &block) end end |
.enqueue_all(batch_job_pairs) ⇒ Array<GoodJob::Batch>
Bulk-enqueue multiple batches with their jobs in minimal DB round-trips.
Instead of creating batches one-at-a-time (~7 queries per batch), this method inserts all batch records and jobs in a fixed number of queries:
1. INSERT all BatchRecords (insert_all!)
2. INSERT all Jobs (insert_all)
3. NOTIFY per distinct queue/scheduled_at
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'app/models/good_job/batch.rb', line 76 def self.enqueue_all(batch_job_pairs) batch_job_pairs = Array(batch_job_pairs) return [] if batch_job_pairs.empty? batch_job_pairs.each do |(batch, _)| raise ArgumentError, "All batches must be new (not persisted)" if batch.persisted? end Rails.application.executor.wrap do current_time = Time.current adapter = ActiveJob::Base.queue_adapter execute_inline = adapter.respond_to?(:execute_inline?) && adapter.execute_inline? # Phase 1: Insert all batch records batch_rows = _build_batch_rows(batch_job_pairs, current_time) BatchRecord.insert_all!(batch_rows) # rubocop:disable Rails/SkipsModelValidations _mark_batches_persisted(batch_job_pairs, batch_rows, current_time) # Phase 2: Build and partition jobs by concurrency limits build_result = _build_and_partition_jobs(batch_job_pairs, current_time) # Phase 3–6: Insert, claim, and execute inline jobs lock_strategy = Job.effective_lock_strategy tracker_registered = false lock_id = nil persisted_jobs = [] inline_jobs = [] if execute_inline GoodJob.capsule.tracker.register tracker_registered = true lock_id = GoodJob.capsule.tracker.id_for_lock end begin if build_result[:bulkable].any? Job.transaction(requires_new: true, joinable: false) do persisted_jobs = _insert_jobs(build_result[:bulkable], build_result[:active_jobs_by_job_id]) if execute_inline inline_jobs = persisted_jobs.select { |job| job.scheduled_at.nil? || job.scheduled_at <= current_time } if lock_strategy != :advisory && lock_id && inline_jobs.any? Job.where(id: inline_jobs.map(&:id)).update_all( # rubocop:disable Rails/SkipsModelValidations locked_by_id: lock_id, locked_at: current_time, lock_type: Job.lock_types[lock_strategy.to_s] ) inline_jobs.each { |j| j.assign_attributes(locked_by_id: lock_id, locked_at: current_time, lock_type: lock_strategy) } end case lock_strategy when :advisory, :hybrid inline_jobs.each(&:advisory_lock!) end end end end # Phase 4: Handle empty batches — they need _continue_discard_or_finish # to trigger on_success/on_finish callbacks (batch_record.rb:77). batches_with_jobs = Set.new build_result[:bulkable].each { |entry| batches_with_jobs.add(entry[:batch]) } build_result[:unbulkable].each { |entry| batches_with_jobs.add(entry[:batch]) } empty_batches = batch_job_pairs.map(&:first).reject { |batch| batches_with_jobs.include?(batch) } if empty_batches.any? buffer = GoodJob::Adapter::InlineBuffer.capture do empty_batches.each do |batch| batch._record.reload batch._record._continue_discard_or_finish(lock: true) end end buffer.call end # Phase 5: Enqueue concurrency-limited jobs individually build_result[:unbulkable].each do |entry| within_thread(batch_id: entry[:batch].id) do entry[:active_job].enqueue end rescue GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError # ignore — matches Bulk::Buffer behavior (bulk.rb:107-109) end # Phase 6: Execute inline jobs if inline_jobs.any? deferred = GoodJob::Adapter::InlineBuffer.defer? GoodJob::Adapter::InlineBuffer.perform_now_or_defer do until inline_jobs.empty? inline_job = inline_jobs.shift active_job = build_result[:active_jobs_by_job_id][inline_job.active_job_id] adapter.send(:perform_inline, inline_job, notify: deferred ? adapter.send(:send_notify?, active_job) : false, already_claimed: lock_strategy != :advisory, advisory_unlock: lock_strategy != :skiplocked) end ensure inline_jobs.each(&:advisory_unlock) GoodJob.capsule.tracker.unregister if tracker_registered tracker_registered = false end elsif tracker_registered GoodJob.capsule.tracker.unregister tracker_registered = false end rescue StandardError if tracker_registered GoodJob.capsule.tracker.unregister tracker_registered = false end raise end # Phase 7: Send NOTIFY for non-inline jobs non_inline_jobs = persisted_jobs - inline_jobs non_inline_jobs = non_inline_jobs.reject(&:finished_at) if inline_jobs.any? _send_notifications(non_inline_jobs, build_result[:active_jobs_by_job_id], adapter) if non_inline_jobs.any? batch_job_pairs.map(&:first) end end |
.find(id) ⇒ Object
196 197 198 |
# File 'app/models/good_job/batch.rb', line 196 def self.find(id) new _record: BatchRecord.find(id) end |
.primary_key ⇒ Object
192 193 194 |
# File 'app/models/good_job/batch.rb', line 192 def self.primary_key :id end |
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch
201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'app/models/good_job/batch.rb', line 201 def self.within_thread(batch_id: nil, batch_callback_id: nil) original_batch_id = current_batch_id original_batch_callback_id = current_batch_callback_id self.current_batch_id = batch_id self.current_batch_callback_id = batch_callback_id yield ensure self.current_batch_id = original_batch_id self.current_batch_callback_id = original_batch_callback_id end |
Instance Method Details
#_record ⇒ Object
310 311 312 |
# File 'app/models/good_job/batch.rb', line 310 def _record record end |
#active_jobs ⇒ Object
295 296 297 |
# File 'app/models/good_job/batch.rb', line 295 def active_jobs record.jobs.map(&:active_job) end |
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch
260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'app/models/good_job/batch.rb', line 260 def add(active_jobs = nil, &block) record.save if record.new_record? buffer = Bulk::Buffer.new buffer.add(active_jobs) buffer.capture(&block) if block self.class.within_thread(batch_id: id) do buffer.enqueue end buffer.active_jobs end |
#assign_properties(properties) ⇒ Object
303 304 305 306 307 308 |
# File 'app/models/good_job/batch.rb', line 303 def assign_properties(properties) properties = properties.dup batch_attrs = PROTECTED_PROPERTIES.index_with { |key| properties.delete(key) }.compact record.assign_attributes(batch_attrs) self.properties.merge!(properties) end |
#callback_active_jobs ⇒ Object
299 300 301 |
# File 'app/models/good_job/batch.rb', line 299 def callback_active_jobs record.callback_jobs.map(&:active_job) end |
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
Returns Active jobs added to the batch.
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'app/models/good_job/batch.rb', line 220 def enqueue(active_jobs = [], **properties, &block) assign_properties(properties) if record.new_record? record.save! else record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.enqueued_at_will_change! record.jobs_finished_at_will_change! if GoodJob::BatchRecord.jobs_finished_at_migrated? record.finished_at_will_change! update_attributes = { discarded_at: nil, finished_at: nil } update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated? record.update!(**update_attributes) end end end active_jobs = add(active_jobs, &block) Rails.application.executor.wrap do buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.update!(enqueued_at: Time.current) # During inline execution, this could enqueue and execute further jobs record._continue_discard_or_finish(lock: false) end end end buffer.call end active_jobs end |
#retry ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'app/models/good_job/batch.rb', line 274 def retry Rails.application.executor.wrap do buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do update_attributes = { discarded_at: nil, finished_at: nil } update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated? record.update!(update_attributes) discarded_jobs = record.jobs.discarded Job.defer_after_commit_maybe(discarded_jobs) do discarded_jobs.each(&:retry_job) record._continue_discard_or_finish(lock: false) end end end end buffer.call end end |