Module: DispatchPolicy::JobExtension::BulkEnqueue
- Defined in:
- lib/dispatch_policy/job_extension.rb
Overview
Rails 7.1+ exposes ActiveJob.perform_all_later. We override it to route jobs declaring a dispatch_policy through a single bulk INSERT, while delegating jobs without a policy to the original enqueue_all path.
Instance Method Summary collapse
Instance Method Details
#perform_all_later(*jobs) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/dispatch_policy/job_extension.rb', line 120 def perform_all_later(*jobs) flat = jobs.flatten return super if flat.empty? # Critical: respect Bypass exactly like the per-job around_enqueue # does. Forwarder.dispatch deserializes admitted jobs and calls # ActiveJob.perform_all_later under Bypass.with — without this # check, BulkEnqueue would re-stage them, creating an infinite # admission loop with the wrong context (job.arguments is still [] # at that point because ActiveJob defers deserialization). return super if DispatchPolicy::Bypass.active? return super unless DispatchPolicy.config.enabled return super unless DispatchPolicy.registry.size.positive? # Split exactly like the single path decides: jobs we own get staged, # everything else (no policy, unregistered policy name, or a retry on # a bypass_retries policy) goes straight to the adapter. Dropping them # — as a `next unless policy` inside the row builder would — silently # loses jobs the caller expected to be enqueued. to_stage, to_adapter = flat.partition { |job| JobExtension.stageable?(job) } super(to_adapter) if to_adapter.any? return nil if to_stage.empty? rows = to_stage.map do |job| policy = DispatchPolicy.registry.fetch(job.class.dispatch_policy_name) # See JobExtension.ensure_arguments_materialized! — we need this # for the same reason as the single-enqueue path. JobExtension.ensure_arguments_materialized!(job) queue_name = job.queue_name&.to_s || policy.queue_name ctx = policy.build_context(job.arguments, queue_name: queue_name) partition_key = policy.partition_key_for(ctx) shard = policy.shard_for(ctx) payload = Serializer.serialize(job) { policy_name: policy.name, partition_key: partition_key, queue_name: queue_name, shard: shard, job_class: job.class.name, job_data: payload, context: ctx.to_jsonb, scheduled_at: JobExtension.scheduled_time(job), priority: job.priority || 0 } end # Only mark enqueued AFTER the INSERT commits. If stage_many! raises, # a caller that rescues and inspects successfully_enqueued? must not # be told the jobs were enqueued when they weren't. Repository.stage_many!(rows) to_stage.each { |job| job.successfully_enqueued = true } nil # ActiveJob.perform_all_later contract returns nil end |