Class: Textus::Ports::Queue
- Inherits:
-
Object
- Object
- Textus::Ports::Queue
- Defined in:
- lib/textus/ports/queue.rb
Overview
File-backed durable job queue under ‘<root>/.run/queue/`. Each job state is a directory; a job is one `<id>.json` file. Claiming is an atomic `rename(2)` from ready/ to leased/ — the rename winner owns the job, so a worker pool needs no central lock. Dedup falls out of the id-as-filename: enqueueing an id that already exists is a no-op. ADR 0038 (runtime subtree), ADR 0108 (instantiable port).
Defined Under Namespace
Classes: Leased
Constant Summary collapse
- STATES =
%i[ready leased done failed].freeze
Instance Method Summary collapse
- #ack(leased) ⇒ Object
- #enqueue(job) ⇒ Object
-
#fail(leased, error:) ⇒ Object
Increment attempts and either requeue (transient) or dead-letter (attempts exhausted).
-
#initialize(root:) ⇒ Queue
constructor
A new instance of Queue.
- #lease(worker_id:, lease_ttl:) ⇒ Object
- #list(state) ⇒ Object
- #purge(state) ⇒ Object
- #ready_ids ⇒ Object
-
#reclaim(now:) ⇒ Object
Return expired leases to ready/ (the holding worker crashed).
- #retry_failed(job_id) ⇒ Object
Constructor Details
Instance Method Details
#ack(leased) ⇒ Object
52 53 54 55 |
# File 'lib/textus/ports/queue.rb', line 52 def ack(leased) dest = File.join(Textus::Layout.queue_state(@root, :done), File.basename(leased.leased_path)) File.rename(leased.leased_path, dest) end |
#enqueue(job) ⇒ Object
21 22 23 24 25 26 |
# File 'lib/textus/ports/queue.rb', line 21 def enqueue(job) dest = path(:ready, job.id) return if File.exist?(dest) # dedup: identical work already queued write_atomic(dest, job.to_h) end |
#fail(leased, error:) ⇒ Object
Increment attempts and either requeue (transient) or dead-letter (attempts exhausted). Returns :requeued or :dead_lettered so the worker can count terminal failures distinctly from retries.
60 61 62 63 64 65 66 67 68 |
# File 'lib/textus/ports/queue.rb', line 60 def fail(leased, error:) job = leased.job job.attempts += 1 job.last_error = error dead = job.attempts >= job.max_attempts write_atomic(path(dead ? :failed : :ready, job.id), job.to_h) File.delete(leased.leased_path) dead ? :dead_lettered : :requeued end |
#lease(worker_id:, lease_ttl:) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/textus/ports/queue.rb', line 35 def lease(worker_id:, lease_ttl:) ready_dir = Textus::Layout.queue_state(@root, :ready) Dir.children(ready_dir).each do |name| src = File.join(ready_dir, name) dst = File.join(Textus::Layout.queue_state(@root, :leased), name) begin File.rename(src, dst) # atomic claim; loser's rename raises ENOENT rescue Errno::ENOENT next # another worker won this one end job = Textus::Domain::Jobs::Job.from_h(JSON.parse(File.read(dst))) stamp_lease(dst, worker_id: worker_id, expires_at: Time.now.utc + lease_ttl) return Leased.new(job: job, leased_path: dst) end nil end |
#list(state) ⇒ Object
93 94 95 |
# File 'lib/textus/ports/queue.rb', line 93 def list(state) Dir.children(Textus::Layout.queue_state(@root, state.to_sym)).map { |f| File.basename(f, ".json") } end |
#purge(state) ⇒ Object
106 107 108 109 |
# File 'lib/textus/ports/queue.rb', line 106 def purge(state) dir = Textus::Layout.queue_state(@root, state.to_sym) Dir.children(dir).each { |f| File.delete(File.join(dir, f)) } end |
#ready_ids ⇒ Object
28 29 30 |
# File 'lib/textus/ports/queue.rb', line 28 def ready_ids Dir.children(Textus::Layout.queue_state(@root, :ready)).map { |f| File.basename(f, ".json") } end |
#reclaim(now:) ⇒ Object
Return expired leases to ready/ (the holding worker crashed). Returns the count reclaimed. At-least-once delivery: a job whose handler actually finished but whose ack was lost will re-run — handlers must be idempotent.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/textus/ports/queue.rb', line 73 def reclaim(now:) leased_dir = Textus::Layout.queue_state(@root, :leased) count = 0 Dir.children(leased_dir).each do |name| src = File.join(leased_dir, name) data = JSON.parse(File.read(src)) expires = data.dig("lease", "expires_at") next if expires && Time.parse(expires) > now dst = File.join(Textus::Layout.queue_state(@root, :ready), name) data.delete("lease") File.write(src, JSON.pretty_generate(data)) File.rename(src, dst) count += 1 rescue Errno::ENOENT next # raced with another reclaimer / the worker's ack end count end |
#retry_failed(job_id) ⇒ Object
97 98 99 100 101 102 103 104 |
# File 'lib/textus/ports/queue.rb', line 97 def retry_failed(job_id) src = path(:failed, job_id) data = JSON.parse(File.read(src)) data["attempts"] = 0 data["last_error"] = nil write_atomic(path(:ready, job_id), data) File.delete(src) end |