Class: DispatchPolicy::StagedJob
- Inherits:
-
ApplicationRecord
- Object
- ActiveRecord::Base
- ApplicationRecord
- DispatchPolicy::StagedJob
- Defined in:
- app/models/dispatch_policy/staged_job.rb
Class Method Summary collapse
-
.context_for(job_instance, policy) ⇒ Object
Merge the job’s ActiveJob metadata (queue_name, priority) into the context hash so gate lambdas can partition_by :queue_name without the user having to pass it as a kwarg.
- .mark_completed_by_active_job_id(active_job_id) ⇒ Object
-
.stage!(job_instance:, policy:) ⇒ Object
Stages a job in the admission queue.
-
.stage_many!(policy:, jobs:) ⇒ Object
Batch-insert variant of stage!.
Instance Method Summary collapse
Class Method Details
.context_for(job_instance, policy) ⇒ Object
Merge the job’s ActiveJob metadata (queue_name, priority) into the context hash so gate lambdas can partition_by :queue_name without the user having to pass it as a kwarg. User-provided keys win.
18 19 20 21 22 23 24 25 |
# File 'app/models/dispatch_policy/staged_job.rb', line 18 def self.context_for(job_instance, policy) built = policy.context_builder.call(job_instance.arguments) return built unless built.is_a?(Hash) { queue_name: job_instance.queue_name, priority: job_instance.priority }.merge(built.symbolize_keys) end |
.mark_completed_by_active_job_id(active_job_id) ⇒ Object
79 80 81 82 83 |
# File 'app/models/dispatch_policy/staged_job.rb', line 79 def self.mark_completed_by_active_job_id(active_job_id) return 0 if active_job_id.blank? where(active_job_id: active_job_id, completed_at: nil) .update_all(completed_at: Time.current, lease_expires_at: nil) end |
.stage!(job_instance:, policy:) ⇒ Object
Stages a job in the admission queue. Returns the created row, or nil if the policy declares a dedupe_key and an active row already exists.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'app/models/dispatch_policy/staged_job.rb', line 29 def self.stage!(job_instance:, policy:) dedupe_key = policy.build_dedupe_key(job_instance.arguments) if dedupe_key && exists?(policy_name: policy.name, dedupe_key: dedupe_key, completed_at: nil) return nil end create!( job_class: job_instance.class.name, policy_name: policy.name, arguments: job_instance.serialize, snapshot: policy.build_snapshot(job_instance.arguments), context: context_for(job_instance, policy), priority: job_instance.priority || 100, not_before_at: job_instance.scheduled_at, staged_at: Time.current, dedupe_key: dedupe_key, round_robin_key: policy.build_round_robin_key(job_instance.arguments) ) rescue ActiveRecord::RecordNotUnique nil end |
.stage_many!(policy:, jobs:) ⇒ Object
Batch-insert variant of stage!.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'app/models/dispatch_policy/staged_job.rb', line 53 def self.stage_many!(policy:, jobs:) return 0 if jobs.empty? now = Time.current rows = jobs.map do |job_instance| { job_class: job_instance.class.name, policy_name: policy.name, arguments: job_instance.serialize, snapshot: policy.build_snapshot(job_instance.arguments), context: context_for(job_instance, policy), priority: job_instance.priority || 100, not_before_at: job_instance.scheduled_at, staged_at: now, dedupe_key: policy.build_dedupe_key(job_instance.arguments), round_robin_key: policy.build_round_robin_key(job_instance.arguments), partitions: {}, created_at: now, updated_at: now } end result = insert_all(rows, unique_by: :idx_dp_staged_dedupe_active) result.rows.size end |
Instance Method Details
#instantiate_active_job ⇒ Object
101 102 103 |
# File 'app/models/dispatch_policy/staged_job.rb', line 101 def instantiate_active_job ActiveJob::Base.deserialize(arguments) end |
#mark_admitted!(partitions:) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'app/models/dispatch_policy/staged_job.rb', line 85 def mark_admitted!(partitions:) now = Time.current job = instantiate_active_job job._dispatch_partitions = partitions job._dispatch_admitted_at = now update!( admitted_at: now, lease_expires_at: now + DispatchPolicy.config.lease_duration, active_job_id: job.job_id, partitions: partitions ) job end |