Class: Kaal::DelayedJob::DatabaseEngine

Inherits:
Registry
  • Object
show all
Defined in:
lib/kaal/delayed_job/database_engine.rb

Overview

Sequel-backed delayed-job store persisted in kaal_delayed_jobs.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Registry

#requires_dispatch_lock?

Constructor Details

#initialize(database:, use_skip_locked: false) ⇒ DatabaseEngine

Returns a new instance of DatabaseEngine.



15
16
17
18
19
# File 'lib/kaal/delayed_job/database_engine.rb', line 15

def initialize(database:, use_skip_locked: false)
  super()
  @database = Kaal::Persistence::Database.new(database)
  @use_skip_locked = use_skip_locked
end

Class Method Details

.normalize_row(row) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/kaal/delayed_job/database_engine.rb', line 56

def self.normalize_row(row)
  return nil unless row

  {
    job_id: row[:job_id],
    run_at: row[:run_at],
    job_class: row[:job_class],
    args: parse_args(row[:args]),
    queue: row[:queue],
    created_at: row[:created_at]
  }
rescue JSON::ParserError
  nil
end

Instance Method Details

#all_jobsObject



48
49
50
# File 'lib/kaal/delayed_job/database_engine.rb', line 48

def all_jobs
  connection[:kaal_delayed_jobs].order(:run_at, :job_id).filter_map { |row| self.class.normalize_row(row) }
end

#claim_strategyObject



52
53
54
# File 'lib/kaal/delayed_job/database_engine.rb', line 52

def claim_strategy
  @use_skip_locked ? :skip_locked : :delete_confirmation
end

#enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/kaal/delayed_job/database_engine.rb', line 21

def enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil)
  now = Time.now.utc
  payload = {
    job_id: job_id,
    run_at: run_at,
    job_class: job_class,
    args: JSON.generate(args),
    queue: queue,
    created_at: now
  }

  dataset_for(connection).insert(payload)
  self.class.normalize_row(payload)
rescue ::Sequel::UniqueConstraintViolation
  raise DuplicateJobError, "Delayed job #{job_id.inspect} already exists"
end

#find_job(job_id, connection: @database.connection) ⇒ Object



44
45
46
# File 'lib/kaal/delayed_job/database_engine.rb', line 44

def find_job(job_id, connection: @database.connection)
  self.class.normalize_row(connection[:kaal_delayed_jobs].where(job_id: job_id).first)
end

#pop_due(now:, limit:) ⇒ Object



38
39
40
41
42
# File 'lib/kaal/delayed_job/database_engine.rb', line 38

def pop_due(now:, limit:)
  return pop_due_with_skip_locked(now:, limit:) if @use_skip_locked

  pop_due_with_delete_confirmation(now:, limit:)
end