Class: Textus::Store::Jobs::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/textus/store/jobs/queue.rb

Defined Under Namespace

Classes: Job, Leased

Constant Summary collapse

VALID_STATES =
%w[ready leased done failed].freeze

Instance Method Summary collapse

Constructor Details

#initialize(store:) ⇒ Queue

Returns a new instance of Queue.



41
42
43
# File 'lib/textus/store/jobs/queue.rb', line 41

def initialize(store:)
  @store = store
end

Instance Method Details

#ack(leased) ⇒ Object



86
87
88
89
90
91
# File 'lib/textus/store/jobs/queue.rb', line 86

def ack(leased)
  @store.execute(
    "UPDATE jobs SET state = 'done', lease = NULL, updated_at = ? WHERE id = ? AND state = 'leased'",
    [iso_now, leased.job.id],
  )
end

#enqueue(job) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/textus/store/jobs/queue.rb', line 45

def enqueue(job)
  now = iso_now
  inserted = @store.execute(
    "INSERT OR IGNORE INTO jobs (id, type, args, state, role, attempts, max_attempts, errors, lease, created_at, updated_at)
   VALUES (?, ?, ?, 'ready', ?, ?, ?, ?, NULL, ?, ?)",
    [job.id, job.type, JSON.dump(job.args), job.role, job.attempts, job.max_attempts, JSON.dump(job.errors), now, now],
  )
  return inserted if @store.query_value("SELECT changes()")&.to_i&.positive?

  @store.execute(
    "UPDATE jobs
       SET state = 'ready', role = ?, args = ?, attempts = 0, errors = ?, lease = NULL, max_attempts = ?, updated_at = ?
     WHERE id = ? AND state IN ('done', 'failed')",
    [job.role, JSON.dump(job.args), JSON.dump([]), job.max_attempts, now, job.id],
  )
end

#fail(leased, error:) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/textus/store/jobs/queue.rb', line 93

def fail(leased, error:)
  job = leased.job
  attempts = job.attempts + 1
  errors = job.errors + [{ "attempt" => attempts, "error" => error, "at" => iso_now }]
  dead = attempts >= job.max_attempts
  state = dead ? "failed" : "ready"
  @store.execute(
    "UPDATE jobs SET state = ?, attempts = ?, errors = ?, lease = NULL, updated_at = ? WHERE id = ?",
    [state, attempts, JSON.dump(errors), iso_now, job.id],
  )
  dead ? :dead_lettered : :requeued
end

#lease(worker_id:, lease_ttl:) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/textus/store/jobs/queue.rb', line 66

def lease(worker_id:, lease_ttl:)
  now = Time.now.utc
  expires_at = now + lease_ttl
  token = SecureRandom.hex(8)
  marked_lease = JSON.dump({ "worker_id" => worker_id, "expires_at" => expires_at.iso8601, "token" => token })

  @store.execute(
    "UPDATE jobs
      SET state = 'leased', lease = ?, updated_at = ?
    WHERE id = (
      SELECT id FROM jobs WHERE state = 'ready' ORDER BY created_at, id LIMIT 1
    )",
    [marked_lease, now.iso8601],
  )
  row = @store.execute("SELECT * FROM jobs WHERE state = 'leased' AND lease = ? LIMIT 1", [marked_lease]).first
  return nil unless row

  Leased.new(job_from_row(row))
end

#list(state) ⇒ Object

Raises:



122
123
124
125
126
127
# File 'lib/textus/store/jobs/queue.rb', line 122

def list(state)
  state = state.to_s
  raise Textus::UsageError.new("unknown job state: #{state}") unless VALID_STATES.include?(state)

  @store.execute("SELECT id FROM jobs WHERE state = ? ORDER BY created_at, id", [state]).map { |row| row["id"] }
end

#purge(state) ⇒ Object

Raises:



136
137
138
139
140
141
# File 'lib/textus/store/jobs/queue.rb', line 136

def purge(state)
  state = state.to_s
  raise Textus::UsageError.new("unknown job state: #{state}") unless VALID_STATES.include?(state)

  @store.execute("DELETE FROM jobs WHERE state = ?", [state])
end

#ready_idsObject



62
63
64
# File 'lib/textus/store/jobs/queue.rb', line 62

def ready_ids
  list(:ready)
end

#reclaim(now:) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/textus/store/jobs/queue.rb', line 106

def reclaim(now:)
  rows = @store.execute("SELECT id, lease FROM jobs WHERE state = 'leased'")
  expired = rows.select do |row|
    lease = JSON.parse(row["lease"] || "{}")
    expires_at = lease["expires_at"]
    expires_at.nil? || Time.parse(expires_at) <= now
  end
  expired.each do |row|
    @store.execute(
      "UPDATE jobs SET state = 'ready', lease = NULL, updated_at = ? WHERE id = ?",
      [now.utc.iso8601, row["id"]],
    )
  end
  expired.size
end

#retry_failed(job_id) ⇒ Object



129
130
131
132
133
134
# File 'lib/textus/store/jobs/queue.rb', line 129

def retry_failed(job_id)
  @store.execute(
    "UPDATE jobs SET state = 'ready', attempts = 0, errors = ?, lease = NULL, updated_at = ? WHERE id = ? AND state = 'failed'",
    [JSON.dump([]), iso_now, job_id],
  )
end