Module: DispatchPolicy::JobExtension::BulkEnqueue
- Defined in:
- lib/dispatch_policy/job_extension.rb
Overview
Rails 7.1+ exposes ActiveJob.perform_all_later. We override it to route jobs declaring a dispatch_policy through a single bulk INSERT, while delegating jobs without a policy to the original enqueue_all path.
Instance Method Summary collapse
Instance Method Details
#perform_all_later(*jobs) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/dispatch_policy/job_extension.rb', line 101 def perform_all_later(*jobs) flat = jobs.flatten return super if flat.empty? # Critical: respect Bypass exactly like the per-job around_enqueue # does. Forwarder.dispatch deserializes admitted jobs and calls # ActiveJob.perform_all_later under Bypass.with — without this # check, BulkEnqueue would re-stage them, creating an infinite # admission loop with the wrong context (job.arguments is still [] # at that point because ActiveJob defers deserialization). return super if DispatchPolicy::Bypass.active? return super unless DispatchPolicy.config.enabled return super unless DispatchPolicy.registry.size.positive? with_policy, without_policy = flat.partition do |j| j.class.respond_to?(:dispatch_policy_name) && j.class.dispatch_policy_name end super(without_policy) if without_policy.any? return nil if with_policy.empty? rows = with_policy.filter_map do |job| policy = DispatchPolicy.registry.fetch(job.class.dispatch_policy_name) next unless policy # See JobExtension.ensure_arguments_materialized! — we need this # for the same reason as the single-enqueue path. JobExtension.ensure_arguments_materialized!(job) queue_name = job.queue_name&.to_s || policy.queue_name ctx = policy.build_context(job.arguments, queue_name: queue_name) partition_key = policy.partition_key_for(ctx) shard = policy.shard_for(ctx) payload = Serializer.serialize(job) job.successfully_enqueued = true { policy_name: policy.name, partition_key: partition_key, queue_name: queue_name, shard: shard, job_class: job.class.name, job_data: payload, context: ctx.to_jsonb, scheduled_at: JobExtension.scheduled_time(job), priority: job.priority || 0 } end Repository.stage_many!(rows) if rows.any? nil # ActiveJob.perform_all_later contract returns nil end |