Module: Delayed::Backend::Base::ClassMethods
- Defined in:
- lib/delayed/backend/base.rb
Instance Method Summary collapse
-
#enqueue(*args) ⇒ Object
Add a job to the queue.
-
#enqueue_all(jobs) ⇒ Object
Bulk-enqueue an array of pre-built Delayed::Job instances.
- #enqueue_job(options) ⇒ Object
- #name_assignable? ⇒ Boolean
-
#recover_from(_error) ⇒ Object
Allow the backend to attempt recovery from reserve errors.
- #reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
- #work_off(num = 100) ⇒ Object
Instance Method Details
#enqueue(*args) ⇒ Object
Add a job to the queue
10 11 12 13 |
# File 'lib/delayed/backend/base.rb', line 10 def enqueue(*args) = Delayed::Backend::JobPreparer.new(*args).prepare enqueue_job() end |
#enqueue_all(jobs) ⇒ Object
Bulk-enqueue an array of pre-built Delayed::Job instances. Callers are responsible for running JobPreparer and instantiating each job via ‘Delayed::Job.new(…)` before calling this method.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/delayed/backend/base.rb', line 30 def enqueue_all(jobs) return 0 if jobs.empty? jobs.each do |job| assert_payload_not_active_job!(job.payload_object) assert_no_enqueue_hook!(job.payload_object) end assert_delay_jobs_not_proc! Delayed.lifecycle.run_callbacks(:enqueue, jobs) do if Delayed::Worker.delay_jobs bulk_insert_all(jobs) else jobs.each(&:invoke_job) end end jobs.size end |
#enqueue_job(options) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/delayed/backend/base.rb', line 15 def enqueue_job() new().tap do |job| assert_payload_not_active_job!(job.payload_object) assert_no_enqueue_hook!(job.payload_object) assert_delay_jobs_not_proc! Delayed.lifecycle.run_callbacks(:enqueue, [job]) do Delayed::Worker.delay_jobs ? job.save : job.invoke_job end end end |
#name_assignable? ⇒ Boolean
71 72 73 |
# File 'lib/delayed/backend/base.rb', line 71 def name_assignable? column_names.include?('name') end |
#recover_from(_error) ⇒ Object
Allow the backend to attempt recovery from reserve errors
64 |
# File 'lib/delayed/backend/base.rb', line 64 def recover_from(_error); end |
#reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/delayed/backend/base.rb', line 50 def reserve(worker, max_run_time = Worker.max_run_time) # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next. # this leads to a more even distribution of jobs across the worker processes claims = 0 find_available(worker.name, worker.read_ahead, max_run_time).select do |job| next if claims >= worker.max_claims job.lock_exclusively!(max_run_time, worker.name).tap do |result| claims += 1 if result end end end |