Class: Kaal::DelayedJob::MemoryEngine
Overview
In-memory delayed job store for single-process development and tests.
Instance Method Summary
collapse
constantize, deep_dup, deep_merge, duplicable?, stringify_keys, symbolize_keys
Methods inherited from Registry
#requires_dispatch_lock?
Constructor Details
Returns a new instance of MemoryEngine.
16
17
18
19
20
|
# File 'lib/kaal/delayed_job/memory_engine.rb', line 16
def initialize
super
@jobs = {}
@mutex = Mutex.new
end
|
Instance Method Details
#all_jobs ⇒ Object
51
52
53
54
55
|
# File 'lib/kaal/delayed_job/memory_engine.rb', line 51
def all_jobs
@mutex.synchronize do
@jobs.values.sort_by { |job| [job[:run_at], job[:job_id]] }.map { |job| deep_dup(job) }
end
end
|
#claim_strategy ⇒ Object
61
62
63
|
# File 'lib/kaal/delayed_job/memory_engine.rb', line 61
def claim_strategy
:atomic_pop
end
|
#clear ⇒ Object
57
58
59
|
# File 'lib/kaal/delayed_job/memory_engine.rb', line 57
def clear
@mutex.synchronize { @jobs.clear }
end
|
#enqueue(job_id:, run_at:, job_class:, args:, queue: nil) ⇒ Object
22
23
24
25
26
27
28
29
30
|
# File 'lib/kaal/delayed_job/memory_engine.rb', line 22
def enqueue(job_id:, run_at:, job_class:, args:, queue: nil, **)
@mutex.synchronize do
raise DuplicateJobError, "Delayed job #{job_id.inspect} already exists" if @jobs.key?(job_id)
job = build_job(job_id:, run_at:, job_class:, args:, queue:)
@jobs[job_id] = job
deep_dup(job)
end
end
|
#find_job(job_id) ⇒ Object
47
48
49
|
# File 'lib/kaal/delayed_job/memory_engine.rb', line 47
def find_job(job_id)
@mutex.synchronize { deep_dup(@jobs[job_id]) }
end
|
#pop_due(now:, limit:) ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/kaal/delayed_job/memory_engine.rb', line 32
def pop_due(now:, limit:)
@mutex.synchronize do
due_jobs = @jobs.values
.select { |job| job[:run_at] <= now }
.sort_by { |job| job.values_at(:run_at, :job_id) }
.first(limit)
due_jobs.each do |job|
job_id = job.fetch(:job_id)
@jobs.delete(job_id)
end
due_jobs.map { |job| deep_dup(job) }
end
end
|