Class: Textus::Ports::JobStore

Inherits:
Object
  • Object
show all
Defined in:
lib/textus/ports/job_store.rb,
lib/textus/ports/job_store/job.rb

Overview

File-backed durable job queue under ‘<root>/.run/queue/`. Each job state is a directory; a job is one `<id>.json` file. Claiming is an atomic `rename(2)` from ready/ to leased/ — the rename winner owns the job, so a worker pool needs no central lock. Dedup falls out of the id-as-filename: enqueueing an id that already exists is a no-op. ADR 0038 (runtime subtree), ADR 0108 (instantiable port).

Defined Under Namespace

Classes: Job, Leased

Constant Summary collapse

STATES =
%i[ready leased done failed].freeze

Instance Method Summary collapse

Constructor Details

#initialize(root:) ⇒ JobStore

Returns a new instance of JobStore.



16
17
18
19
# File 'lib/textus/ports/job_store.rb', line 16

def initialize(root:)
  @root = root
  STATES.each { |s| FileUtils.mkdir_p(Textus::Layout.queue_state(root, s)) }
end

Instance Method Details

#ack(leased) ⇒ Object



51
52
53
54
# File 'lib/textus/ports/job_store.rb', line 51

def ack(leased)
  dest = File.join(Textus::Layout.queue_state(@root, :done), File.basename(leased.leased_path))
  File.rename(leased.leased_path, dest)
end

#enqueue(job) ⇒ Object



21
22
23
24
25
26
# File 'lib/textus/ports/job_store.rb', line 21

def enqueue(job)
  dest = path(:ready, job.id)
  return if File.exist?(dest)

  write_atomic(dest, job.to_h)
end

#fail(leased, error:) ⇒ Object



56
57
58
59
60
61
62
63
64
# File 'lib/textus/ports/job_store.rb', line 56

def fail(leased, error:)
  job = leased.job
  job.attempts += 1
  job.last_error = error
  dead = job.attempts >= job.max_attempts
  write_atomic(path(dead ? :failed : :ready, job.id), job.to_h)
  File.delete(leased.leased_path)
  dead ? :dead_lettered : :requeued
end

#lease(worker_id:, lease_ttl:) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/textus/ports/job_store.rb', line 34

def lease(worker_id:, lease_ttl:)
  ready_dir = Textus::Layout.queue_state(@root, :ready)
  Dir.children(ready_dir).each do |name|
    src = File.join(ready_dir, name)
    dst = File.join(Textus::Layout.queue_state(@root, :leased), name)
    begin
      File.rename(src, dst)
    rescue Errno::ENOENT
      next
    end
    job = Job.from_h(JSON.parse(File.read(dst)))
    stamp_lease(dst, worker_id: worker_id, expires_at: Time.now.utc + lease_ttl)
    return Leased.new(job: job, leased_path: dst)
  end
  nil
end

#list(state) ⇒ Object



86
87
88
# File 'lib/textus/ports/job_store.rb', line 86

def list(state)
  Dir.children(Textus::Layout.queue_state(@root, state.to_sym)).map { |f| File.basename(f, ".json") }
end

#purge(state) ⇒ Object



99
100
101
102
# File 'lib/textus/ports/job_store.rb', line 99

def purge(state)
  dir = Textus::Layout.queue_state(@root, state.to_sym)
  Dir.children(dir).each { |f| File.delete(File.join(dir, f)) }
end

#ready_idsObject



28
29
30
# File 'lib/textus/ports/job_store.rb', line 28

def ready_ids
  Dir.children(Textus::Layout.queue_state(@root, :ready)).map { |f| File.basename(f, ".json") }
end

#reclaim(now:) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/textus/ports/job_store.rb', line 66

def reclaim(now:)
  leased_dir = Textus::Layout.queue_state(@root, :leased)
  count = 0
  Dir.children(leased_dir).each do |name|
    src = File.join(leased_dir, name)
    data = JSON.parse(File.read(src))
    expires = data.dig("lease", "expires_at")
    next if expires && Time.parse(expires) > now

    dst = File.join(Textus::Layout.queue_state(@root, :ready), name)
    data.delete("lease")
    File.write(src, JSON.pretty_generate(data))
    File.rename(src, dst)
    count += 1
  rescue Errno::ENOENT
    next
  end
  count
end

#retry_failed(job_id) ⇒ Object



90
91
92
93
94
95
96
97
# File 'lib/textus/ports/job_store.rb', line 90

def retry_failed(job_id)
  src = path(:failed, job_id)
  data = JSON.parse(File.read(src))
  data["attempts"] = 0
  data["last_error"] = nil
  write_atomic(path(:ready, job_id), data)
  File.delete(src)
end