Class: DispatchPolicy::StagedJob

Inherits:
ApplicationRecord show all
Defined in:
app/models/dispatch_policy/staged_job.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.context_for(job_instance, policy) ⇒ Object

Merge the job’s ActiveJob metadata (queue_name, priority) into the context hash so gate lambdas can partition_by :queue_name without the user having to pass it as a kwarg. User-provided keys win.



18
19
20
21
22
23
24
25
# File 'app/models/dispatch_policy/staged_job.rb', line 18

def self.context_for(job_instance, policy)
  built = policy.context_builder.call(job_instance.arguments)
  return built unless built.is_a?(Hash)
  {
    queue_name: job_instance.queue_name,
    priority:   job_instance.priority
  }.merge(built.symbolize_keys)
end

.mark_completed_by_active_job_id(active_job_id) ⇒ Object



79
80
81
82
83
# File 'app/models/dispatch_policy/staged_job.rb', line 79

def self.mark_completed_by_active_job_id(active_job_id)
  return 0 if active_job_id.blank?
  where(active_job_id: active_job_id, completed_at: nil)
    .update_all(completed_at: Time.current, lease_expires_at: nil)
end

.stage!(job_instance:, policy:) ⇒ Object

Stages a job in the admission queue. Returns the created row, or nil if the policy declares a dedupe_key and an active row already exists.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'app/models/dispatch_policy/staged_job.rb', line 29

def self.stage!(job_instance:, policy:)
  dedupe_key = policy.build_dedupe_key(job_instance.arguments)

  if dedupe_key && exists?(policy_name: policy.name, dedupe_key: dedupe_key, completed_at: nil)
    return nil
  end

  create!(
    job_class:       job_instance.class.name,
    policy_name:     policy.name,
    arguments:       job_instance.serialize,
    snapshot:        policy.build_snapshot(job_instance.arguments),
    context:         context_for(job_instance, policy),
    priority:        job_instance.priority || 100,
    not_before_at:   job_instance.scheduled_at,
    staged_at:       Time.current,
    dedupe_key:      dedupe_key,
    round_robin_key: policy.build_round_robin_key(job_instance.arguments)
  )
rescue ActiveRecord::RecordNotUnique
  nil
end

.stage_many!(policy:, jobs:) ⇒ Object

Batch-insert variant of stage!.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'app/models/dispatch_policy/staged_job.rb', line 53

def self.stage_many!(policy:, jobs:)
  return 0 if jobs.empty?

  now = Time.current
  rows = jobs.map do |job_instance|
    {
      job_class:       job_instance.class.name,
      policy_name:     policy.name,
      arguments:       job_instance.serialize,
      snapshot:        policy.build_snapshot(job_instance.arguments),
      context:         context_for(job_instance, policy),
      priority:        job_instance.priority || 100,
      not_before_at:   job_instance.scheduled_at,
      staged_at:       now,
      dedupe_key:      policy.build_dedupe_key(job_instance.arguments),
      round_robin_key: policy.build_round_robin_key(job_instance.arguments),
      partitions:      {},
      created_at:      now,
      updated_at:      now
    }
  end

  result = insert_all(rows, unique_by: :idx_dp_staged_dedupe_active)
  result.rows.size
end

Instance Method Details

#instantiate_active_jobObject



101
102
103
# File 'app/models/dispatch_policy/staged_job.rb', line 101

def instantiate_active_job
  ActiveJob::Base.deserialize(arguments)
end

#mark_admitted!(partitions:) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'app/models/dispatch_policy/staged_job.rb', line 85

def mark_admitted!(partitions:)
  now = Time.current
  job = instantiate_active_job
  job._dispatch_partitions  = partitions
  job._dispatch_admitted_at = now

  update!(
    admitted_at:      now,
    lease_expires_at: now + DispatchPolicy.config.lease_duration,
    active_job_id:    job.job_id,
    partitions:       partitions
  )

  job
end