Module: DispatchPolicy::JobExtension

Extended by:
ActiveSupport::Concern
Defined in:
lib/dispatch_policy/job_extension.rb

Overview

Hooks into ActiveJob::Base. Adds:

- the `dispatch_policy :name do … end` class macro
- an `around_enqueue` callback that stages jobs declaring a policy
- a `perform_all_later` patch that handles bulk enqueue

Defined Under Namespace

Modules: BulkEnqueue

Class Method Summary collapse

Class Method Details

.around_enqueue_for(job, block) ⇒ Object

Called by the around_enqueue lambda. Public so it can be tested directly.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/dispatch_policy/job_extension.rb', line 28

def self.around_enqueue_for(job, block)
  return block.call if Bypass.active?
  return block.call unless DispatchPolicy.config.enabled

  policy = DispatchPolicy.registry.fetch(job.class.dispatch_policy_name)
  return block.call unless policy

  if retry_attempt?(job) && policy.bypass_retries?
    return block.call
  end

  # `klass.deserialize(payload)` (used elsewhere — see Forwarder, retries)
  # only sets @serialized_arguments. ActiveJob defers the actual
  # arguments deserialization to perform_now via the private
  # deserialize_arguments_if_needed. If something deserializes a job
  # and re-enqueues it without going through perform_now (e.g. a
  # custom retry path), `job.arguments` would be []. Guard against
  # that here so the context proc always sees the real args.
  ensure_arguments_materialized!(job)

  queue_name    = job.queue_name&.to_s || policy.queue_name
  ctx           = policy.build_context(job.arguments, queue_name: queue_name)
  partition_key = policy.partition_key_for(ctx)
  shard         = policy.shard_for(ctx)
  payload       = Serializer.serialize(job)

  Repository.stage!(
    policy_name:   policy.name,
    partition_key: partition_key,
    queue_name:    queue_name,
    shard:         shard,
    job_class:     job.class.name,
    job_data:      payload,
    context:       ctx.to_jsonb,
    scheduled_at:  scheduled_time(job),
    priority:      job.priority || 0
  )

  job.successfully_enqueued = true
  false # halts the around_enqueue chain so the real adapter never sees the job
end

.ensure_arguments_materialized!(job) ⇒ Object

ActiveJob’s ‘arguments` getter is a plain attr_accessor that returns the in-memory @arguments. After `klass.deserialize(payload)`, that array is empty until perform_now triggers `deserialize_arguments_if_needed` (a private method). Anywhere we read `job.arguments` outside of perform we must materialize first, or the context proc receives [] and falls back to its defaults.



109
110
111
112
# File 'lib/dispatch_policy/job_extension.rb', line 109

def self.ensure_arguments_materialized!(job)
  return unless job.respond_to?(:deserialize_arguments_if_needed, true)
  job.send(:deserialize_arguments_if_needed)
end

.retry_attempt?(job) ⇒ Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/dispatch_policy/job_extension.rb', line 70

def self.retry_attempt?(job)
  (job.respond_to?(:executions) ? job.executions.to_i : 0).positive?
end

.scheduled_time(job) ⇒ Object



93
94
95
96
97
98
99
100
101
# File 'lib/dispatch_policy/job_extension.rb', line 93

def self.scheduled_time(job)
  ts = job.scheduled_at
  return nil if ts.nil?
  return ts   if ts.is_a?(Time)

  Time.at(Float(ts))
rescue ArgumentError, TypeError
  nil
end

.stageable?(job) ⇒ Boolean

Whether a job should be staged through admission control rather than handed straight to the adapter. Mirrors the single-enqueue decision in around_enqueue_for: it needs a registered policy and must not be a retry on a bypass_retries policy. Used by the bulk path to split jobs so the ones we don’t own fall through to the adapter instead of being silently dropped.

Returns:

  • (Boolean)


80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/dispatch_policy/job_extension.rb', line 80

def self.stageable?(job)
  return false unless job.class.respond_to?(:dispatch_policy_name)

  name = job.class.dispatch_policy_name
  return false unless name

  policy = DispatchPolicy.registry.fetch(name)
  return false unless policy
  return false if retry_attempt?(job) && policy.bypass_retries?

  true
end