Class: Textus::Store::Jobs::Queue
- Inherits:
-
Object
- Object
- Textus::Store::Jobs::Queue
- Defined in:
- lib/textus/store/jobs/queue.rb
Defined Under Namespace
Constant Summary collapse
- VALID_STATES =
%w[ready leased done failed].freeze
Instance Method Summary collapse
- #ack(leased) ⇒ Object
- #enqueue(job) ⇒ Object
- #fail(leased, error:) ⇒ Object
-
#initialize(store:) ⇒ Queue
constructor
A new instance of Queue.
- #lease(worker_id:, lease_ttl:) ⇒ Object
- #list(state) ⇒ Object
- #purge(state) ⇒ Object
- #ready_ids ⇒ Object
- #reclaim(now:) ⇒ Object
- #retry_failed(job_id) ⇒ Object
Constructor Details
#initialize(store:) ⇒ Queue
Returns a new instance of Queue.
41 42 43 |
# File 'lib/textus/store/jobs/queue.rb', line 41 def initialize(store:) @store = store end |
Instance Method Details
#ack(leased) ⇒ Object
78 79 80 81 82 83 |
# File 'lib/textus/store/jobs/queue.rb', line 78 def ack(leased) @store.execute( "UPDATE jobs SET state = 'done', lease = NULL, updated_at = ? WHERE id = ? AND state = 'leased'", [iso_now, leased.job.id], ) end |
#enqueue(job) ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/textus/store/jobs/queue.rb', line 45 def enqueue(job) now = iso_now @store.execute( "INSERT OR IGNORE INTO jobs (id, type, args, state, role, attempts, max_attempts, errors, lease, created_at, updated_at) VALUES (?, ?, ?, 'ready', ?, ?, ?, ?, NULL, ?, ?)", [job.id, job.type, JSON.dump(job.args), job.role, job.attempts, job.max_attempts, JSON.dump(job.errors), now, now], ) end |
#fail(leased, error:) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/textus/store/jobs/queue.rb', line 85 def fail(leased, error:) job = leased.job attempts = job.attempts + 1 errors = job.errors + [{ "attempt" => attempts, "error" => error, "at" => iso_now }] dead = attempts >= job.max_attempts state = dead ? "failed" : "ready" @store.execute( "UPDATE jobs SET state = ?, attempts = ?, errors = ?, lease = NULL, updated_at = ? WHERE id = ?", [state, attempts, JSON.dump(errors), iso_now, job.id], ) dead ? :dead_lettered : :requeued end |
#lease(worker_id:, lease_ttl:) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/textus/store/jobs/queue.rb', line 58 def lease(worker_id:, lease_ttl:) now = Time.now.utc expires_at = now + lease_ttl token = SecureRandom.hex(8) marked_lease = JSON.dump({ "worker_id" => worker_id, "expires_at" => expires_at.iso8601, "token" => token }) @store.execute( "UPDATE jobs SET state = 'leased', lease = ?, updated_at = ? WHERE id = ( SELECT id FROM jobs WHERE state = 'ready' ORDER BY created_at, id LIMIT 1 )", [marked_lease, now.iso8601], ) row = @store.execute("SELECT * FROM jobs WHERE state = 'leased' AND lease = ? LIMIT 1", [marked_lease]).first return nil unless row Leased.new(job_from_row(row)) end |
#list(state) ⇒ Object
114 115 116 117 118 119 |
# File 'lib/textus/store/jobs/queue.rb', line 114 def list(state) state = state.to_s raise Textus::UsageError.new("unknown job state: #{state}") unless VALID_STATES.include?(state) @store.execute("SELECT id FROM jobs WHERE state = ? ORDER BY created_at, id", [state]).map { |row| row["id"] } end |
#purge(state) ⇒ Object
128 129 130 131 132 133 |
# File 'lib/textus/store/jobs/queue.rb', line 128 def purge(state) state = state.to_s raise Textus::UsageError.new("unknown job state: #{state}") unless VALID_STATES.include?(state) @store.execute("DELETE FROM jobs WHERE state = ?", [state]) end |
#ready_ids ⇒ Object
54 55 56 |
# File 'lib/textus/store/jobs/queue.rb', line 54 def ready_ids list(:ready) end |
#reclaim(now:) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/textus/store/jobs/queue.rb', line 98 def reclaim(now:) rows = @store.execute("SELECT id, lease FROM jobs WHERE state = 'leased'") expired = rows.select do |row| lease = JSON.parse(row["lease"] || "{}") expires_at = lease["expires_at"] expires_at.nil? || Time.parse(expires_at) <= now end expired.each do |row| @store.execute( "UPDATE jobs SET state = 'ready', lease = NULL, updated_at = ? WHERE id = ?", [now.utc.iso8601, row["id"]], ) end expired.size end |
#retry_failed(job_id) ⇒ Object
121 122 123 124 125 126 |
# File 'lib/textus/store/jobs/queue.rb', line 121 def retry_failed(job_id) @store.execute( "UPDATE jobs SET state = 'ready', attempts = 0, errors = ?, lease = NULL, updated_at = ? WHERE id = ? AND state = 'failed'", [JSON.dump([]), iso_now, job_id], ) end |