Class: Kaal::Internal::ActiveRecord::DelayedJobRegistry

Inherits:
DelayedJob::Registry show all
Defined in:
lib/kaal/internal/active_record/delayed_job_registry.rb

Overview

Active Record-backed store for delayed jobs.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from DelayedJob::Registry

#requires_dispatch_lock?

Constructor Details

#initialize(connection: nil, model: DelayedJobRecord, use_skip_locked: false) ⇒ DelayedJobRegistry

Returns a new instance of DelayedJobRegistry.



15
16
17
18
19
20
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 15

def initialize(connection: nil, model: DelayedJobRecord, use_skip_locked: false)
  super()
  ConnectionSupport.configure!(connection)
  @model = model
  @use_skip_locked = use_skip_locked
end

Class Method Details

.normalize(record) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 84

def self.normalize(record)
  return nil unless record

  {
    job_id: record.job_id,
    run_at: record.run_at,
    job_class: record.job_class,
    args: parse_args(record.args),
    queue: record.queue,
    created_at: record.created_at
  }
rescue JSON::ParserError
  nil
end

Instance Method Details

#all_jobsObject



76
77
78
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 76

def all_jobs
  @model.order(:run_at, :job_id).filter_map { |record| self.class.normalize(record) }
end

#claim_strategyObject



80
81
82
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 80

def claim_strategy
  @use_skip_locked ? :skip_locked : :delete_confirmation
end

#enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 22

def enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil)
  now = Time.now.utc
  attributes = {
    job_id: job_id,
    run_at: run_at,
    job_class: job_class,
    args: JSON.generate(args),
    queue: queue,
    created_at: now
  }

  if connection
    insert_with_connection(connection, attributes)
  else
    @model.create!(attributes)
  end

  self.class.normalize(@model.new(attributes))
rescue ::ActiveRecord::RecordNotUnique
  raise Kaal::DelayedJob::DuplicateJobError, "Delayed job #{job_id.inspect} already exists"
end

#find_job(job_id) ⇒ Object



72
73
74
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 72

def find_job(job_id)
  self.class.normalize(@model.find_by(job_id: job_id))
end

#pop_due(now:, limit:) ⇒ Object



44
45
46
47
48
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 44

def pop_due(now:, limit:)
  return pop_due_with_skip_locked(now:, limit:) if @use_skip_locked

  pop_due_with_delete_confirmation(now:, limit:)
end