Module: DispatchPolicy::Dispatchable

Extended by:
ActiveSupport::Concern
Defined in:
lib/dispatch_policy/dispatchable.rb

Instance Method Summary collapse

Instance Method Details

#deserialize(job_data) ⇒ Object



116
117
118
119
120
121
# File 'lib/dispatch_policy/dispatchable.rb', line 116

def deserialize(job_data)
  super
  self._dispatch_partitions  = job_data["_dispatch_partitions"]
  ts                         = job_data["_dispatch_admitted_at"]
  self._dispatch_admitted_at = ts ? Time.iso8601(ts) : nil
end

#enqueue(options = {}) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/dispatch_policy/dispatchable.rb', line 89

def enqueue(options = {})
  return super unless self.class.dispatch_policy?
  if options[:_bypass_staging]
    return super(options.except(:_bypass_staging))
  end
  return super unless DispatchPolicy.enabled?

  # Mirror Active Job's scheduling option handling before staging.
  self.scheduled_at = options[:wait].seconds.from_now if options[:wait]
  self.scheduled_at = options[:wait_until] if options[:wait_until]
  self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
  self.priority     = options[:priority].to_i if options[:priority]

  DispatchPolicy::StagedJob.stage!(
    job_instance: self,
    policy:       self.class.resolved_dispatch_policy
  )
  self
end

#serializeObject



109
110
111
112
113
114
# File 'lib/dispatch_policy/dispatchable.rb', line 109

def serialize
  super.merge(
    "_dispatch_partitions"  => _dispatch_partitions || {},
    "_dispatch_admitted_at" => _dispatch_admitted_at&.iso8601(6)
  )
end