Module: DispatchPolicy::JobExtension

Extended by:
ActiveSupport::Concern
Defined in:
lib/dispatch_policy/job_extension.rb

Overview

Hooks into ActiveJob::Base. Adds:

- the `dispatch_policy :name do … end` class macro
- an `around_enqueue` callback that stages jobs declaring a policy
- a `perform_all_later` patch that handles bulk enqueue

Defined Under Namespace

Modules: BulkEnqueue

Class Method Summary collapse

Class Method Details

.around_enqueue_for(job, block) ⇒ Object

Called by the around_enqueue lambda. Public so it can be tested directly.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/dispatch_policy/job_extension.rb', line 28

def self.around_enqueue_for(job, block)
  return block.call if Bypass.active?
  return block.call unless DispatchPolicy.config.enabled

  policy = DispatchPolicy.registry.fetch(job.class.dispatch_policy_name)
  return block.call unless policy

  if retry_attempt?(job) && policy.bypass_retries?
    return block.call
  end

  # `klass.deserialize(payload)` (used elsewhere — see Forwarder, retries)
  # only sets @serialized_arguments. ActiveJob defers the actual
  # arguments deserialization to perform_now via the private
  # deserialize_arguments_if_needed. If something deserializes a job
  # and re-enqueues it without going through perform_now (e.g. a
  # custom retry path), `job.arguments` would be []. Guard against
  # that here so the context proc always sees the real args.
  ensure_arguments_materialized!(job)

  queue_name    = job.queue_name&.to_s || policy.queue_name
  ctx           = policy.build_context(job.arguments, queue_name: queue_name)
  partition_key = policy.partition_key_for(ctx)
  shard         = policy.shard_for(ctx)
  payload       = Serializer.serialize(job)

  Repository.stage!(
    policy_name:   policy.name,
    partition_key: partition_key,
    queue_name:    queue_name,
    shard:         shard,
    job_class:     job.class.name,
    job_data:      payload,
    context:       ctx.to_jsonb,
    scheduled_at:  scheduled_time(job),
    priority:      job.priority || 0
  )

  job.successfully_enqueued = true
  false # halts the around_enqueue chain so the real adapter never sees the job
end

.ensure_arguments_materialized!(job) ⇒ Object

ActiveJob’s ‘arguments` getter is a plain attr_accessor that returns the in-memory @arguments. After `klass.deserialize(payload)`, that array is empty until perform_now triggers `deserialize_arguments_if_needed` (a private method). Anywhere we read `job.arguments` outside of perform we must materialize first, or the context proc receives [] and falls back to its defaults.



90
91
92
93
# File 'lib/dispatch_policy/job_extension.rb', line 90

def self.ensure_arguments_materialized!(job)
  return unless job.respond_to?(:deserialize_arguments_if_needed, true)
  job.send(:deserialize_arguments_if_needed)
end

.retry_attempt?(job) ⇒ Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/dispatch_policy/job_extension.rb', line 70

def self.retry_attempt?(job)
  (job.respond_to?(:executions) ? job.executions.to_i : 0).positive?
end

.scheduled_time(job) ⇒ Object



74
75
76
77
78
79
80
81
82
# File 'lib/dispatch_policy/job_extension.rb', line 74

def self.scheduled_time(job)
  ts = job.scheduled_at
  return nil if ts.nil?
  return ts   if ts.is_a?(Time)

  Time.at(Float(ts))
rescue ArgumentError, TypeError
  nil
end