Class: GoodJob::Batch
- Inherits:
-
Object
- Object
- GoodJob::Batch
- Includes:
- GlobalID::Identification
- Defined in:
- app/models/good_job/batch.rb
Overview
NOTE: This class delegates to BatchRecord and is intended to be the public interface for Batches.
Constant Summary collapse
- PROTECTED_PROPERTIES =
%i[ on_finish on_success on_discard callback_queue_name callback_priority description properties ].freeze
Class Method Summary collapse
-
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it.
- .find(id) ⇒ Object
- .primary_key ⇒ Object
-
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch.
Instance Method Summary collapse
- #_record ⇒ Object
- #active_jobs ⇒ Object
-
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch.
- #assign_properties(properties) ⇒ Object
- #callback_active_jobs ⇒ Object
-
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
Active jobs added to the batch.
-
#initialize(_record: nil, **properties) ⇒ Batch
constructor
rubocop:disable Lint/UnderscorePrefixedVariableName.
Constructor Details
#initialize(_record: nil, **properties) ⇒ Batch
rubocop:disable Lint/UnderscorePrefixedVariableName
84 85 86 87 |
# File 'app/models/good_job/batch.rb', line 84 def initialize(_record: nil, **properties) # rubocop:disable Lint/UnderscorePrefixedVariableName self.record = _record || BatchRecord.new assign_properties(properties) end |
Class Method Details
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it
56 57 58 59 60 |
# File 'app/models/good_job/batch.rb', line 56 def self.enqueue(active_jobs = [], **properties, &block) new.tap do |batch| batch.enqueue(active_jobs, **properties, &block) end end |
.find(id) ⇒ Object
66 67 68 |
# File 'app/models/good_job/batch.rb', line 66 def self.find(id) new _record: BatchRecord.find(id) end |
.primary_key ⇒ Object
62 63 64 |
# File 'app/models/good_job/batch.rb', line 62 def self.primary_key :id end |
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch
71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'app/models/good_job/batch.rb', line 71 def self.within_thread(batch_id: nil, batch_callback_id: nil) original_batch_id = current_batch_id original_batch_callback_id = current_batch_callback_id self.current_batch_id = batch_id self.current_batch_callback_id = batch_callback_id yield ensure self.current_batch_id = original_batch_id self.current_batch_callback_id = original_batch_callback_id end |
Instance Method Details
#_record ⇒ Object
148 149 150 |
# File 'app/models/good_job/batch.rb', line 148 def _record record end |
#active_jobs ⇒ Object
133 134 135 |
# File 'app/models/good_job/batch.rb', line 133 def active_jobs record.jobs.map(&:active_job) end |
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch
119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'app/models/good_job/batch.rb', line 119 def add(active_jobs = nil, &block) record.save if record.new_record? buffer = Bulk::Buffer.new buffer.add(active_jobs) buffer.capture(&block) if block self.class.within_thread(batch_id: id) do buffer.enqueue end buffer.active_jobs end |
#assign_properties(properties) ⇒ Object
141 142 143 144 145 146 |
# File 'app/models/good_job/batch.rb', line 141 def assign_properties(properties) properties = properties.dup batch_attrs = PROTECTED_PROPERTIES.index_with { |key| properties.delete(key) }.compact record.assign_attributes(batch_attrs) self.properties.merge!(properties) end |
#callback_active_jobs ⇒ Object
137 138 139 |
# File 'app/models/good_job/batch.rb', line 137 def callback_active_jobs record.callback_jobs.map(&:active_job) end |
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
Returns Active jobs added to the batch.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'app/models/good_job/batch.rb', line 90 def enqueue(active_jobs = [], **properties, &block) assign_properties(properties) if record.new_record? record.save! else record.with_advisory_lock(function: "pg_advisory_lock") do record.enqueued_at_will_change! record.finished_at_will_change! record.update!(enqueued_at: nil, finished_at: nil) end end active_jobs = add(active_jobs, &block) Rails.application.executor.wrap do record.with_advisory_lock(function: "pg_advisory_lock") do record.update!(enqueued_at: Time.current) # During inline execution, this could enqueue and execute further jobs record._continue_discard_or_finish(lock: false) end end active_jobs end |