Class: GoodJob::Batch
- Inherits:
-
Object
- Object
- GoodJob::Batch
- Includes:
- GlobalID::Identification
- Defined in:
- app/models/good_job/batch.rb
Overview
NOTE: This class delegates to BatchRecord and is intended to be the public interface for Batches.
Constant Summary collapse
- PROTECTED_PROPERTIES =
%i[ on_finish on_success on_discard callback_queue_name callback_priority description properties ].freeze
Class Method Summary collapse
-
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it.
- .find(id) ⇒ Object
- .primary_key ⇒ Object
-
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch.
Instance Method Summary collapse
- #_record ⇒ Object
- #active_jobs ⇒ Object
-
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch.
- #assign_properties(properties) ⇒ Object
- #callback_active_jobs ⇒ Object
-
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
Active jobs added to the batch.
-
#initialize(_record: nil, **properties) ⇒ Batch
constructor
rubocop:disable Lint/UnderscorePrefixedVariableName.
- #retry ⇒ Object
Constructor Details
#initialize(_record: nil, **properties) ⇒ Batch
rubocop:disable Lint/UnderscorePrefixedVariableName
84 85 86 87 |
# File 'app/models/good_job/batch.rb', line 84 def initialize(_record: nil, **properties) # rubocop:disable Lint/UnderscorePrefixedVariableName self.record = _record || BatchRecord.new assign_properties(properties) end |
Class Method Details
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it
56 57 58 59 60 |
# File 'app/models/good_job/batch.rb', line 56 def self.enqueue(active_jobs = [], **properties, &block) new.tap do |batch| batch.enqueue(active_jobs, **properties, &block) end end |
.find(id) ⇒ Object
66 67 68 |
# File 'app/models/good_job/batch.rb', line 66 def self.find(id) new _record: BatchRecord.find(id) end |
.primary_key ⇒ Object
62 63 64 |
# File 'app/models/good_job/batch.rb', line 62 def self.primary_key :id end |
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch
71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'app/models/good_job/batch.rb', line 71 def self.within_thread(batch_id: nil, batch_callback_id: nil) original_batch_id = current_batch_id original_batch_callback_id = current_batch_callback_id self.current_batch_id = batch_id self.current_batch_callback_id = batch_callback_id yield ensure self.current_batch_id = original_batch_id self.current_batch_callback_id = original_batch_callback_id end |
Instance Method Details
#_record ⇒ Object
170 171 172 |
# File 'app/models/good_job/batch.rb', line 170 def _record record end |
#active_jobs ⇒ Object
155 156 157 |
# File 'app/models/good_job/batch.rb', line 155 def active_jobs record.jobs.map(&:active_job) end |
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch
126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'app/models/good_job/batch.rb', line 126 def add(active_jobs = nil, &block) record.save if record.new_record? buffer = Bulk::Buffer.new buffer.add(active_jobs) buffer.capture(&block) if block self.class.within_thread(batch_id: id) do buffer.enqueue end buffer.active_jobs end |
#assign_properties(properties) ⇒ Object
163 164 165 166 167 168 |
# File 'app/models/good_job/batch.rb', line 163 def assign_properties(properties) properties = properties.dup batch_attrs = PROTECTED_PROPERTIES.index_with { |key| properties.delete(key) }.compact record.assign_attributes(batch_attrs) self.properties.merge!(properties) end |
#callback_active_jobs ⇒ Object
159 160 161 |
# File 'app/models/good_job/batch.rb', line 159 def callback_active_jobs record.callback_jobs.map(&:active_job) end |
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
Returns Active jobs added to the batch.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'app/models/good_job/batch.rb', line 90 def enqueue(active_jobs = [], **properties, &block) assign_properties(properties) if record.new_record? record.save! else record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.enqueued_at_will_change! record.finished_at_will_change! record.update!(enqueued_at: nil, finished_at: nil) end end end active_jobs = add(active_jobs, &block) Rails.application.executor.wrap do buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.update!(enqueued_at: Time.current) # During inline execution, this could enqueue and execute further jobs record._continue_discard_or_finish(lock: false) end end end buffer.call end active_jobs end |
#retry ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'app/models/good_job/batch.rb', line 140 def retry Rails.application.executor.wrap do buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.update!(discarded_at: nil, finished_at: nil) record.jobs.discarded.each(&:retry_job) record._continue_discard_or_finish(lock: false) end end end buffer.call end end |