Class: Textus::Store::Jobs::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/textus/store/jobs/queue.rb

Defined Under Namespace

Classes: Job, Leased

Constant Summary collapse

VALID_STATES =
%w[ready leased done failed].freeze

Instance Method Summary collapse

Constructor Details

#initialize(store:) ⇒ Queue

Returns a new instance of Queue.



41
42
43
# File 'lib/textus/store/jobs/queue.rb', line 41

def initialize(store:)
  @store = store
end

Instance Method Details

#ack(leased) ⇒ Object



78
79
80
81
82
83
# File 'lib/textus/store/jobs/queue.rb', line 78

def ack(leased)
  @store.execute(
    "UPDATE jobs SET state = 'done', lease = NULL, updated_at = ? WHERE id = ? AND state = 'leased'",
    [iso_now, leased.job.id],
  )
end

#enqueue(job) ⇒ Object



45
46
47
48
49
50
51
52
# File 'lib/textus/store/jobs/queue.rb', line 45

def enqueue(job)
  now = iso_now
  @store.execute(
    "INSERT OR IGNORE INTO jobs (id, type, args, state, role, attempts, max_attempts, errors, lease, created_at, updated_at)
   VALUES (?, ?, ?, 'ready', ?, ?, ?, ?, NULL, ?, ?)",
    [job.id, job.type, JSON.dump(job.args), job.role, job.attempts, job.max_attempts, JSON.dump(job.errors), now, now],
  )
end

#fail(leased, error:) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/textus/store/jobs/queue.rb', line 85

def fail(leased, error:)
  job = leased.job
  attempts = job.attempts + 1
  errors = job.errors + [{ "attempt" => attempts, "error" => error, "at" => iso_now }]
  dead = attempts >= job.max_attempts
  state = dead ? "failed" : "ready"
  @store.execute(
    "UPDATE jobs SET state = ?, attempts = ?, errors = ?, lease = NULL, updated_at = ? WHERE id = ?",
    [state, attempts, JSON.dump(errors), iso_now, job.id],
  )
  dead ? :dead_lettered : :requeued
end

#lease(worker_id:, lease_ttl:) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/textus/store/jobs/queue.rb', line 58

def lease(worker_id:, lease_ttl:)
  now = Time.now.utc
  expires_at = now + lease_ttl
  token = SecureRandom.hex(8)
  marked_lease = JSON.dump({ "worker_id" => worker_id, "expires_at" => expires_at.iso8601, "token" => token })

  @store.execute(
    "UPDATE jobs
      SET state = 'leased', lease = ?, updated_at = ?
    WHERE id = (
      SELECT id FROM jobs WHERE state = 'ready' ORDER BY created_at, id LIMIT 1
    )",
    [marked_lease, now.iso8601],
  )
  row = @store.execute("SELECT * FROM jobs WHERE state = 'leased' AND lease = ? LIMIT 1", [marked_lease]).first
  return nil unless row

  Leased.new(job_from_row(row))
end

#list(state) ⇒ Object

Raises:



114
115
116
117
118
119
# File 'lib/textus/store/jobs/queue.rb', line 114

def list(state)
  state = state.to_s
  raise Textus::UsageError.new("unknown job state: #{state}") unless VALID_STATES.include?(state)

  @store.execute("SELECT id FROM jobs WHERE state = ? ORDER BY created_at, id", [state]).map { |row| row["id"] }
end

#purge(state) ⇒ Object

Raises:



128
129
130
131
132
133
# File 'lib/textus/store/jobs/queue.rb', line 128

def purge(state)
  state = state.to_s
  raise Textus::UsageError.new("unknown job state: #{state}") unless VALID_STATES.include?(state)

  @store.execute("DELETE FROM jobs WHERE state = ?", [state])
end

#ready_idsObject



54
55
56
# File 'lib/textus/store/jobs/queue.rb', line 54

def ready_ids
  list(:ready)
end

#reclaim(now:) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/textus/store/jobs/queue.rb', line 98

def reclaim(now:)
  rows = @store.execute("SELECT id, lease FROM jobs WHERE state = 'leased'")
  expired = rows.select do |row|
    lease = JSON.parse(row["lease"] || "{}")
    expires_at = lease["expires_at"]
    expires_at.nil? || Time.parse(expires_at) <= now
  end
  expired.each do |row|
    @store.execute(
      "UPDATE jobs SET state = 'ready', lease = NULL, updated_at = ? WHERE id = ?",
      [now.utc.iso8601, row["id"]],
    )
  end
  expired.size
end

#retry_failed(job_id) ⇒ Object



121
122
123
124
125
126
# File 'lib/textus/store/jobs/queue.rb', line 121

def retry_failed(job_id)
  @store.execute(
    "UPDATE jobs SET state = 'ready', attempts = 0, errors = ?, lease = NULL, updated_at = ? WHERE id = ? AND state = 'failed'",
    [JSON.dump([]), iso_now, job_id],
  )
end