Module: Delayed::Backend::Base::ClassMethods

Defined in:
lib/delayed/backend/base.rb

Instance Method Summary collapse

Instance Method Details

#enqueue(*args) ⇒ Object

Add a job to the queue



10
11
12
13
# File 'lib/delayed/backend/base.rb', line 10

def enqueue(*args)
  job_options = Delayed::Backend::JobPreparer.new(*args).prepare
  enqueue_job(job_options)
end

#enqueue_all(jobs) ⇒ Object

Bulk-enqueue an array of pre-built Delayed::Job instances. Callers are responsible for running JobPreparer and instantiating each job via ‘Delayed::Job.new(…)` before calling this method.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/delayed/backend/base.rb', line 30

def enqueue_all(jobs)
  return 0 if jobs.empty?

  jobs.each do |job|
    assert_payload_not_active_job!(job.payload_object)
    assert_no_enqueue_hook!(job.payload_object)
  end
  assert_delay_jobs_not_proc!

  Delayed.lifecycle.run_callbacks(:enqueue, jobs) do
    if Delayed::Worker.delay_jobs
      bulk_insert_all(jobs)
    else
      jobs.each(&:invoke_job)
    end
  end

  jobs.size
end

#enqueue_job(options) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/delayed/backend/base.rb', line 15

def enqueue_job(options)
  new(options).tap do |job|
    assert_payload_not_active_job!(job.payload_object)
    assert_no_enqueue_hook!(job.payload_object)
    assert_delay_jobs_not_proc!

    Delayed.lifecycle.run_callbacks(:enqueue, [job]) do
      Delayed::Worker.delay_jobs ? job.save : job.invoke_job
    end
  end
end

#name_assignable?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/delayed/backend/base.rb', line 71

def name_assignable?
  column_names.include?('name')
end

#recover_from(_error) ⇒ Object

Allow the backend to attempt recovery from reserve errors



64
# File 'lib/delayed/backend/base.rb', line 64

def recover_from(_error); end

#reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/delayed/backend/base.rb', line 50

def reserve(worker, max_run_time = Worker.max_run_time)
  # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
  # this leads to a more even distribution of jobs across the worker processes
  claims = 0
  find_available(worker.name, worker.read_ahead, max_run_time).select do |job|
    next if claims >= worker.max_claims

    job.lock_exclusively!(max_run_time, worker.name).tap do |result|
      claims += 1 if result
    end
  end
end

#work_off(num = 100) ⇒ Object



66
67
68
69
# File 'lib/delayed/backend/base.rb', line 66

def work_off(num = 100)
  warn '[DEPRECATION] `Delayed::Job.work_off` is deprecated. Use `Delayed::Worker.new.work_off instead.'
  Delayed::Worker.new.work_off(num)
end