Class: Kaal::DelayedJob::MemoryEngine

Inherits:
Registry
  • Object
show all
Includes:
Support::HashTools
Defined in:
lib/kaal/delayed_job/memory_engine.rb

Overview

In-memory delayed job store for single-process development and tests.

Instance Method Summary collapse

Methods included from Support::HashTools

constantize, deep_dup, deep_merge, duplicable?, stringify_keys, symbolize_keys

Methods inherited from Registry

#requires_dispatch_lock?

Constructor Details

#initializeMemoryEngine

Returns a new instance of MemoryEngine.



16
17
18
19
20
# File 'lib/kaal/delayed_job/memory_engine.rb', line 16

def initialize
  super
  @jobs = {}
  @mutex = Mutex.new
end

Instance Method Details

#all_jobsObject



51
52
53
54
55
# File 'lib/kaal/delayed_job/memory_engine.rb', line 51

def all_jobs
  @mutex.synchronize do
    @jobs.values.sort_by { |job| [job[:run_at], job[:job_id]] }.map { |job| deep_dup(job) }
  end
end

#claim_strategyObject



61
62
63
# File 'lib/kaal/delayed_job/memory_engine.rb', line 61

def claim_strategy
  :atomic_pop
end

#clearObject



57
58
59
# File 'lib/kaal/delayed_job/memory_engine.rb', line 57

def clear
  @mutex.synchronize { @jobs.clear }
end

#enqueue(job_id:, run_at:, job_class:, args:, queue: nil) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/kaal/delayed_job/memory_engine.rb', line 22

def enqueue(job_id:, run_at:, job_class:, args:, queue: nil, **)
  @mutex.synchronize do
    raise DuplicateJobError, "Delayed job #{job_id.inspect} already exists" if @jobs.key?(job_id)

    job = build_job(job_id:, run_at:, job_class:, args:, queue:)
    @jobs[job_id] = job
    deep_dup(job)
  end
end

#find_job(job_id) ⇒ Object



47
48
49
# File 'lib/kaal/delayed_job/memory_engine.rb', line 47

def find_job(job_id)
  @mutex.synchronize { deep_dup(@jobs[job_id]) }
end

#pop_due(now:, limit:) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/kaal/delayed_job/memory_engine.rb', line 32

def pop_due(now:, limit:)
  @mutex.synchronize do
    due_jobs = @jobs.values
                    .select { |job| job[:run_at] <= now }
                    .sort_by { |job| job.values_at(:run_at, :job_id) }
                    .first(limit)

    due_jobs.each do |job|
      job_id = job.fetch(:job_id)
      @jobs.delete(job_id)
    end
    due_jobs.map { |job| deep_dup(job) }
  end
end