Class: Textus::Ports::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/textus/ports/queue.rb

Overview

File-backed durable job queue under ‘<root>/.run/queue/`. Each job state is a directory; a job is one `<id>.json` file. Claiming is an atomic `rename(2)` from ready/ to leased/ — the rename winner owns the job, so a worker pool needs no central lock. Dedup falls out of the id-as-filename: enqueueing an id that already exists is a no-op. ADR 0038 (runtime subtree), ADR 0108 (instantiable port).

Defined Under Namespace

Classes: Leased

Constant Summary collapse

STATES =
%i[ready leased done failed].freeze

Instance Method Summary collapse

Constructor Details

#initialize(root:) ⇒ Queue

Returns a new instance of Queue.



16
17
18
19
# File 'lib/textus/ports/queue.rb', line 16

def initialize(root:)
  @root = root
  STATES.each { |s| FileUtils.mkdir_p(Textus::Layout.queue_state(root, s)) }
end

Instance Method Details

#ack(leased) ⇒ Object



52
53
54
55
# File 'lib/textus/ports/queue.rb', line 52

def ack(leased)
  dest = File.join(Textus::Layout.queue_state(@root, :done), File.basename(leased.leased_path))
  File.rename(leased.leased_path, dest)
end

#enqueue(job) ⇒ Object



21
22
23
24
25
26
# File 'lib/textus/ports/queue.rb', line 21

def enqueue(job)
  dest = path(:ready, job.id)
  return if File.exist?(dest) # dedup: identical work already queued

  write_atomic(dest, job.to_h)
end

#fail(leased, error:) ⇒ Object

Increment attempts and either requeue (transient) or dead-letter (attempts exhausted). Returns :requeued or :dead_lettered so the worker can count terminal failures distinctly from retries.



60
61
62
63
64
65
66
67
68
# File 'lib/textus/ports/queue.rb', line 60

def fail(leased, error:)
  job = leased.job
  job.attempts += 1
  job.last_error = error
  dead = job.attempts >= job.max_attempts
  write_atomic(path(dead ? :failed : :ready, job.id), job.to_h)
  File.delete(leased.leased_path)
  dead ? :dead_lettered : :requeued
end

#lease(worker_id:, lease_ttl:) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/textus/ports/queue.rb', line 35

def lease(worker_id:, lease_ttl:)
  ready_dir = Textus::Layout.queue_state(@root, :ready)
  Dir.children(ready_dir).each do |name|
    src = File.join(ready_dir, name)
    dst = File.join(Textus::Layout.queue_state(@root, :leased), name)
    begin
      File.rename(src, dst) # atomic claim; loser's rename raises ENOENT
    rescue Errno::ENOENT
      next # another worker won this one
    end
    job = Textus::Domain::Jobs::Job.from_h(JSON.parse(File.read(dst)))
    stamp_lease(dst, worker_id: worker_id, expires_at: Time.now.utc + lease_ttl)
    return Leased.new(job: job, leased_path: dst)
  end
  nil
end

#list(state) ⇒ Object



93
94
95
# File 'lib/textus/ports/queue.rb', line 93

def list(state)
  Dir.children(Textus::Layout.queue_state(@root, state.to_sym)).map { |f| File.basename(f, ".json") }
end

#purge(state) ⇒ Object



106
107
108
109
# File 'lib/textus/ports/queue.rb', line 106

def purge(state)
  dir = Textus::Layout.queue_state(@root, state.to_sym)
  Dir.children(dir).each { |f| File.delete(File.join(dir, f)) }
end

#ready_idsObject



28
29
30
# File 'lib/textus/ports/queue.rb', line 28

def ready_ids
  Dir.children(Textus::Layout.queue_state(@root, :ready)).map { |f| File.basename(f, ".json") }
end

#reclaim(now:) ⇒ Object

Return expired leases to ready/ (the holding worker crashed). Returns the count reclaimed. At-least-once delivery: a job whose handler actually finished but whose ack was lost will re-run — handlers must be idempotent.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/textus/ports/queue.rb', line 73

def reclaim(now:)
  leased_dir = Textus::Layout.queue_state(@root, :leased)
  count = 0
  Dir.children(leased_dir).each do |name|
    src = File.join(leased_dir, name)
    data = JSON.parse(File.read(src))
    expires = data.dig("lease", "expires_at")
    next if expires && Time.parse(expires) > now

    dst = File.join(Textus::Layout.queue_state(@root, :ready), name)
    data.delete("lease")
    File.write(src, JSON.pretty_generate(data))
    File.rename(src, dst)
    count += 1
  rescue Errno::ENOENT
    next # raced with another reclaimer / the worker's ack
  end
  count
end

#retry_failed(job_id) ⇒ Object



97
98
99
100
101
102
103
104
# File 'lib/textus/ports/queue.rb', line 97

def retry_failed(job_id)
  src = path(:failed, job_id)
  data = JSON.parse(File.read(src))
  data["attempts"] = 0
  data["last_error"] = nil
  write_atomic(path(:ready, job_id), data)
  File.delete(src)
end