Class: Textus::Store::Jobs::Queue
- Inherits:
-
Object
- Object
- Textus::Store::Jobs::Queue
- Defined in:
- lib/textus/store/jobs/queue.rb
Defined Under Namespace
Constant Summary collapse
- VALID_STATES =
%w[ready leased done failed].freeze
Instance Method Summary collapse
- #ack(leased) ⇒ Object
- #enqueue(job) ⇒ Object
- #fail(leased, error:) ⇒ Object
-
#initialize(store:) ⇒ Queue
constructor
A new instance of Queue.
- #lease(worker_id:, lease_ttl:) ⇒ Object
- #list(state) ⇒ Object
- #purge(state) ⇒ Object
- #ready_ids ⇒ Object
- #reclaim(now:) ⇒ Object
- #retry_failed(job_id) ⇒ Object
Constructor Details
#initialize(store:) ⇒ Queue
Returns a new instance of Queue.
41 42 43 |
# File 'lib/textus/store/jobs/queue.rb', line 41 def initialize(store:) @store = store end |
Instance Method Details
#ack(leased) ⇒ Object
86 87 88 89 90 91 |
# File 'lib/textus/store/jobs/queue.rb', line 86 def ack(leased) @store.execute( "UPDATE jobs SET state = 'done', lease = NULL, updated_at = ? WHERE id = ? AND state = 'leased'", [iso_now, leased.job.id], ) end |
#enqueue(job) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/textus/store/jobs/queue.rb', line 45 def enqueue(job) now = iso_now inserted = @store.execute( "INSERT OR IGNORE INTO jobs (id, type, args, state, role, attempts, max_attempts, errors, lease, created_at, updated_at) VALUES (?, ?, ?, 'ready', ?, ?, ?, ?, NULL, ?, ?)", [job.id, job.type, JSON.dump(job.args), job.role, job.attempts, job.max_attempts, JSON.dump(job.errors), now, now], ) return inserted if @store.query_value("SELECT changes()")&.to_i&.positive? @store.execute( "UPDATE jobs SET state = 'ready', role = ?, args = ?, attempts = 0, errors = ?, lease = NULL, max_attempts = ?, updated_at = ? WHERE id = ? AND state IN ('done', 'failed')", [job.role, JSON.dump(job.args), JSON.dump([]), job.max_attempts, now, job.id], ) end |
#fail(leased, error:) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/textus/store/jobs/queue.rb', line 93 def fail(leased, error:) job = leased.job attempts = job.attempts + 1 errors = job.errors + [{ "attempt" => attempts, "error" => error, "at" => iso_now }] dead = attempts >= job.max_attempts state = dead ? "failed" : "ready" @store.execute( "UPDATE jobs SET state = ?, attempts = ?, errors = ?, lease = NULL, updated_at = ? WHERE id = ?", [state, attempts, JSON.dump(errors), iso_now, job.id], ) dead ? :dead_lettered : :requeued end |
#lease(worker_id:, lease_ttl:) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/textus/store/jobs/queue.rb', line 66 def lease(worker_id:, lease_ttl:) now = Time.now.utc expires_at = now + lease_ttl token = SecureRandom.hex(8) marked_lease = JSON.dump({ "worker_id" => worker_id, "expires_at" => expires_at.iso8601, "token" => token }) @store.execute( "UPDATE jobs SET state = 'leased', lease = ?, updated_at = ? WHERE id = ( SELECT id FROM jobs WHERE state = 'ready' ORDER BY created_at, id LIMIT 1 )", [marked_lease, now.iso8601], ) row = @store.execute("SELECT * FROM jobs WHERE state = 'leased' AND lease = ? LIMIT 1", [marked_lease]).first return nil unless row Leased.new(job_from_row(row)) end |
#list(state) ⇒ Object
122 123 124 125 126 127 |
# File 'lib/textus/store/jobs/queue.rb', line 122 def list(state) state = state.to_s raise Textus::UsageError.new("unknown job state: #{state}") unless VALID_STATES.include?(state) @store.execute("SELECT id FROM jobs WHERE state = ? ORDER BY created_at, id", [state]).map { |row| row["id"] } end |
#purge(state) ⇒ Object
136 137 138 139 140 141 |
# File 'lib/textus/store/jobs/queue.rb', line 136 def purge(state) state = state.to_s raise Textus::UsageError.new("unknown job state: #{state}") unless VALID_STATES.include?(state) @store.execute("DELETE FROM jobs WHERE state = ?", [state]) end |
#ready_ids ⇒ Object
62 63 64 |
# File 'lib/textus/store/jobs/queue.rb', line 62 def ready_ids list(:ready) end |
#reclaim(now:) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/textus/store/jobs/queue.rb', line 106 def reclaim(now:) rows = @store.execute("SELECT id, lease FROM jobs WHERE state = 'leased'") expired = rows.select do |row| lease = JSON.parse(row["lease"] || "{}") expires_at = lease["expires_at"] expires_at.nil? || Time.parse(expires_at) <= now end expired.each do |row| @store.execute( "UPDATE jobs SET state = 'ready', lease = NULL, updated_at = ? WHERE id = ?", [now.utc.iso8601, row["id"]], ) end expired.size end |
#retry_failed(job_id) ⇒ Object
129 130 131 132 133 134 |
# File 'lib/textus/store/jobs/queue.rb', line 129 def retry_failed(job_id) @store.execute( "UPDATE jobs SET state = 'ready', attempts = 0, errors = ?, lease = NULL, updated_at = ? WHERE id = ? AND state = 'failed'", [JSON.dump([]), iso_now, job_id], ) end |