Module: DispatchPolicy::JobExtension
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/dispatch_policy/job_extension.rb
Overview
Hooks into ActiveJob::Base. Adds:
- the `dispatch_policy :name do … end` class macro
- an `around_enqueue` callback that stages jobs declaring a policy
- a `perform_all_later` patch that handles bulk enqueue
Defined Under Namespace
Modules: BulkEnqueue
Class Method Summary collapse
-
.around_enqueue_for(job, block) ⇒ Object
Called by the around_enqueue lambda.
-
.ensure_arguments_materialized!(job) ⇒ Object
ActiveJob’s ‘arguments` getter is a plain attr_accessor that returns the in-memory @arguments.
- .retry_attempt?(job) ⇒ Boolean
- .scheduled_time(job) ⇒ Object
-
.stageable?(job) ⇒ Boolean
Whether a job should be staged through admission control rather than handed straight to the adapter.
Class Method Details
.around_enqueue_for(job, block) ⇒ Object
Called by the around_enqueue lambda. Public so it can be tested directly.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/dispatch_policy/job_extension.rb', line 28 def self.around_enqueue_for(job, block) return block.call if Bypass.active? return block.call unless DispatchPolicy.config.enabled policy = DispatchPolicy.registry.fetch(job.class.dispatch_policy_name) return block.call unless policy if retry_attempt?(job) && policy.bypass_retries? return block.call end # `klass.deserialize(payload)` (used elsewhere — see Forwarder, retries) # only sets @serialized_arguments. ActiveJob defers the actual # arguments deserialization to perform_now via the private # deserialize_arguments_if_needed. If something deserializes a job # and re-enqueues it without going through perform_now (e.g. a # custom retry path), `job.arguments` would be []. Guard against # that here so the context proc always sees the real args. ensure_arguments_materialized!(job) queue_name = job.queue_name&.to_s || policy.queue_name ctx = policy.build_context(job.arguments, queue_name: queue_name) partition_key = policy.partition_key_for(ctx) shard = policy.shard_for(ctx) payload = Serializer.serialize(job) Repository.stage!( policy_name: policy.name, partition_key: partition_key, queue_name: queue_name, shard: shard, job_class: job.class.name, job_data: payload, context: ctx.to_jsonb, scheduled_at: scheduled_time(job), priority: job.priority || 0 ) job.successfully_enqueued = true false # halts the around_enqueue chain so the real adapter never sees the job end |
.ensure_arguments_materialized!(job) ⇒ Object
ActiveJob’s ‘arguments` getter is a plain attr_accessor that returns the in-memory @arguments. After `klass.deserialize(payload)`, that array is empty until perform_now triggers `deserialize_arguments_if_needed` (a private method). Anywhere we read `job.arguments` outside of perform we must materialize first, or the context proc receives [] and falls back to its defaults.
109 110 111 112 |
# File 'lib/dispatch_policy/job_extension.rb', line 109 def self.ensure_arguments_materialized!(job) return unless job.respond_to?(:deserialize_arguments_if_needed, true) job.send(:deserialize_arguments_if_needed) end |
.retry_attempt?(job) ⇒ Boolean
70 71 72 |
# File 'lib/dispatch_policy/job_extension.rb', line 70 def self.retry_attempt?(job) (job.respond_to?(:executions) ? job.executions.to_i : 0).positive? end |
.scheduled_time(job) ⇒ Object
93 94 95 96 97 98 99 100 101 |
# File 'lib/dispatch_policy/job_extension.rb', line 93 def self.scheduled_time(job) ts = job.scheduled_at return nil if ts.nil? return ts if ts.is_a?(Time) Time.at(Float(ts)) rescue ArgumentError, TypeError nil end |
.stageable?(job) ⇒ Boolean
Whether a job should be staged through admission control rather than handed straight to the adapter. Mirrors the single-enqueue decision in around_enqueue_for: it needs a registered policy and must not be a retry on a bypass_retries policy. Used by the bulk path to split jobs so the ones we don’t own fall through to the adapter instead of being silently dropped.
80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/dispatch_policy/job_extension.rb', line 80 def self.stageable?(job) return false unless job.class.respond_to?(:dispatch_policy_name) name = job.class.dispatch_policy_name return false unless name policy = DispatchPolicy.registry.fetch(name) return false unless policy return false if retry_attempt?(job) && policy.bypass_retries? true end |