Module: DispatchPolicy::JobExtension
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/dispatch_policy/job_extension.rb
Overview
Hooks into ActiveJob::Base. Adds:
- the `dispatch_policy :name do … end` class macro
- an `around_enqueue` callback that stages jobs declaring a policy
- a `perform_all_later` patch that handles bulk enqueue
Defined Under Namespace
Modules: BulkEnqueue
Class Method Summary collapse
-
.around_enqueue_for(job, block) ⇒ Object
Called by the around_enqueue lambda.
-
.ensure_arguments_materialized!(job) ⇒ Object
ActiveJob’s ‘arguments` getter is a plain attr_accessor that returns the in-memory @arguments.
- .retry_attempt?(job) ⇒ Boolean
- .scheduled_time(job) ⇒ Object
Class Method Details
.around_enqueue_for(job, block) ⇒ Object
Called by the around_enqueue lambda. Public so it can be tested directly.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/dispatch_policy/job_extension.rb', line 28 def self.around_enqueue_for(job, block) return block.call if Bypass.active? return block.call unless DispatchPolicy.config.enabled policy = DispatchPolicy.registry.fetch(job.class.dispatch_policy_name) return block.call unless policy if retry_attempt?(job) && policy.bypass_retries? return block.call end # `klass.deserialize(payload)` (used elsewhere — see Forwarder, retries) # only sets @serialized_arguments. ActiveJob defers the actual # arguments deserialization to perform_now via the private # deserialize_arguments_if_needed. If something deserializes a job # and re-enqueues it without going through perform_now (e.g. a # custom retry path), `job.arguments` would be []. Guard against # that here so the context proc always sees the real args. ensure_arguments_materialized!(job) queue_name = job.queue_name&.to_s || policy.queue_name ctx = policy.build_context(job.arguments, queue_name: queue_name) partition_key = policy.partition_key_for(ctx) shard = policy.shard_for(ctx) payload = Serializer.serialize(job) Repository.stage!( policy_name: policy.name, partition_key: partition_key, queue_name: queue_name, shard: shard, job_class: job.class.name, job_data: payload, context: ctx.to_jsonb, scheduled_at: scheduled_time(job), priority: job.priority || 0 ) job.successfully_enqueued = true false # halts the around_enqueue chain so the real adapter never sees the job end |
.ensure_arguments_materialized!(job) ⇒ Object
ActiveJob’s ‘arguments` getter is a plain attr_accessor that returns the in-memory @arguments. After `klass.deserialize(payload)`, that array is empty until perform_now triggers `deserialize_arguments_if_needed` (a private method). Anywhere we read `job.arguments` outside of perform we must materialize first, or the context proc receives [] and falls back to its defaults.
90 91 92 93 |
# File 'lib/dispatch_policy/job_extension.rb', line 90 def self.ensure_arguments_materialized!(job) return unless job.respond_to?(:deserialize_arguments_if_needed, true) job.send(:deserialize_arguments_if_needed) end |
.retry_attempt?(job) ⇒ Boolean
70 71 72 |
# File 'lib/dispatch_policy/job_extension.rb', line 70 def self.retry_attempt?(job) (job.respond_to?(:executions) ? job.executions.to_i : 0).positive? end |
.scheduled_time(job) ⇒ Object
74 75 76 77 78 79 80 81 82 |
# File 'lib/dispatch_policy/job_extension.rb', line 74 def self.scheduled_time(job) ts = job.scheduled_at return nil if ts.nil? return ts if ts.is_a?(Time) Time.at(Float(ts)) rescue ArgumentError, TypeError nil end |