Class: Tina4::QueueBackends::LiteBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/queue_backends/lite_backend.rb

Overview

File-based queue backend — JSON files on disk. Zero dependencies.

Each job is stored as a separate .json file under <dir>/<topic>/. Dead-lettered jobs (those that exhausted their retries) live under the shared <dir>/dead_letter/ directory, tagged with their topic.

Dequeue policy: highest priority first, ties broken oldest-first by the stored created_at (ISO-8601, so lexicographic == chronological). The file name is NOT the ordering key — the stored priority/created_at are.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ LiteBackend

Returns a new instance of LiteBackend.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/tina4/queue_backends/lite_backend.rb', line 22

def initialize(options = {})
  @dir = options[:dir] || File.join(Dir.pwd, ".queue")
  @dead_letter_dir = File.join(@dir, "dead_letter")
  # Retry policy. Mirrors the Python lite backend: a failed job is
  # re-enqueued while attempts < max_retries, then dead-lettered.
  @max_retries = options[:max_retries] || 3
  # Seconds to delay a job's next attempt when fail() re-enqueues it.
  # 0 (default) = retry on the very next pop/consume iteration.
  @retry_backoff = options[:retry_backoff] || 0
  # Reservation/visibility timeout (seconds). When a job is popped it is
  # held in <topic>/reserved/ with available_at = now + visibility_timeout.
  # If the consumer dies before complete()/fail() (crash, OOM, k8s
  # eviction) the next pop() reclaims it once the window expires —
  # incrementing attempts and re-enqueuing, or dead-lettering past
  # max_retries. <= 0 disables the reclaim (a reservation then lasts until
  # the consumer acks — the old at-most-once behaviour).
  @visibility_timeout = options[:visibility_timeout] || 300.0
  FileUtils.mkdir_p(@dir)
  FileUtils.mkdir_p(@dead_letter_dir)
  @mutex = Mutex.new
end

Instance Attribute Details

#max_retriesObject

Retry policy — settable so a Queue can propagate its own max_retries / retry_backoff onto a backend instance passed directly (legacy path).



20
21
22
# File 'lib/tina4/queue_backends/lite_backend.rb', line 20

def max_retries
  @max_retries
end

#retry_backoffObject

Retry policy — settable so a Queue can propagate its own max_retries / retry_backoff onto a backend instance passed directly (legacy path).



20
21
22
# File 'lib/tina4/queue_backends/lite_backend.rb', line 20

def retry_backoff
  @retry_backoff
end

#visibility_timeoutObject

Retry/visibility policy is settable so a Queue can propagate its own configuration onto a backend instance passed directly (legacy path).



46
47
48
# File 'lib/tina4/queue_backends/lite_backend.rb', line 46

def visibility_timeout
  @visibility_timeout
end

Instance Method Details

#acknowledge(message) ⇒ Object



117
118
119
# File 'lib/tina4/queue_backends/lite_backend.rb', line 117

def acknowledge(message)
  # File already deleted on dequeue — nothing to do.
end

#clear(topic) ⇒ Object

Remove all pending jobs from a topic (and any held reservations). Returns count removed.



289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/tina4/queue_backends/lite_backend.rb', line 289

def clear(topic)
  count = 0
  [topic_path(topic), reserved_path(topic)].each do |dir|
    next unless Dir.exist?(dir)
    Dir.glob(File.join(dir, "*.json")).each do |file|
      File.delete(file)
      count += 1
    rescue Errno::ENOENT
      next
    end
  end
  count
end

#complete(message) ⇒ Object



121
122
123
124
125
126
# File 'lib/tina4/queue_backends/lite_backend.rb', line 121

def complete(message)
  # The pending file was claimed on dequeue and a reservation record
  # written; complete() is terminal, so drop the reservation. The job is
  # done and gone.
  clear_reservation(message.topic, message.id)
end

#dead_letter(message) ⇒ Object



157
158
159
160
161
162
# File 'lib/tina4/queue_backends/lite_backend.rb', line 157

def dead_letter(message)
  path = File.join(@dead_letter_dir, "#{message.id}.json")
  data = message.to_hash
  data[:status] = "dead"
  File.write(path, JSON.generate(data))
end

#dead_letter_count(topic) ⇒ Object

Count dead-letter / failed messages for a topic.



178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/tina4/queue_backends/lite_backend.rb', line 178

def dead_letter_count(topic)
  return 0 unless Dir.exist?(@dead_letter_dir)

  count = 0
  Dir.glob(File.join(@dead_letter_dir, "*.json")).each do |file|
    data = JSON.parse(File.read(file))
    count += 1 if data["topic"] == topic.to_s
  rescue JSON::ParserError
    next
  end
  count
end

#dead_letters(topic, max_retries: 3) ⇒ Object

Get dead letter jobs for a topic — messages that exceeded max retries. Returns Hashes (raw job data with status “dead”).



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/tina4/queue_backends/lite_backend.rb', line 200

def dead_letters(topic, max_retries: 3)
  return [] unless Dir.exist?(@dead_letter_dir)

  files = Dir.glob(File.join(@dead_letter_dir, "*.json")).sort_by { |f| File.mtime(f) }
  jobs = []

  files.each do |file|
    data = JSON.parse(File.read(file))
    next unless data["topic"] == topic.to_s
    next if (data["attempts"] || 0) < max_retries
    data["status"] = "dead"
    jobs << data
  rescue JSON::ParserError
    next
  end

  jobs
end

#dequeue(topic) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/tina4/queue_backends/lite_backend.rb', line 57

def dequeue(topic)
  @mutex.synchronize do
    # First return any reservations whose consumer died mid-flight.
    reclaim_expired(topic)
    candidate = available_candidates(topic).first
    return nil unless candidate

    # Write the reservation BEFORE claiming the pending file, so a crash
    # between claim and reserve can never strand the job. Only the worker
    # that wins the delete owns — and returns — it.
    write_reserved(candidate[:data], topic)
    begin
      File.delete(candidate[:file])
    rescue Errno::ENOENT
      return nil # already consumed by another worker
    end
    job_from_data(candidate[:data], topic)
  end
end

#dequeue_batch(topic, count) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/tina4/queue_backends/lite_backend.rb', line 77

def dequeue_batch(topic, count)
  @mutex.synchronize do
    reclaim_expired(topic)
    chosen = available_candidates(topic).first(count)
    chosen.filter_map do |c|
      write_reserved(c[:data], topic)
      begin
        File.delete(c[:file])
      rescue Errno::ENOENT
        next
      end
      job_from_data(c[:data], topic)
    end
  end
end

#enqueue(message) ⇒ Object



48
49
50
51
52
53
54
55
# File 'lib/tina4/queue_backends/lite_backend.rb', line 48

def enqueue(message)
  @mutex.synchronize do
    topic_dir = topic_path(message.topic)
    FileUtils.mkdir_p(topic_dir)
    path = File.join(topic_dir, "#{message.id}.json")
    File.write(path, message.to_json)
  end
end

#fail(job, error = "") ⇒ Object

Record a failed attempt. Increments attempts and stores the error. While attempts < max_retries the job is re-enqueued to pending (after the configured retry_backoff). Once attempts >= max_retries it is moved to the dead-letter store.



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/tina4/queue_backends/lite_backend.rb', line 136

def fail(job, error = "")
  # Clear the reservation — the consumer acknowledged (with a failure).
  clear_reservation(job.topic, job.id)
  job.attempts += 1
  job.error = error
  if job.attempts < @max_retries
    requeue_job(job, delay_seconds: @retry_backoff, error: error)
  else
    move_to_dead_letter(job, error)
  end
end

#failed(topic, max_retries: 3) ⇒ Object

Jobs that have failed at least once but are still being retried.

Under the auto-retry lifecycle a failed-but-retryable job lives in the pending queue (not the dead-letter dir), so this scans the topic dir for jobs with 0 < attempts < max_retries. Dead-lettered jobs are returned by dead_letters(). Returns Hashes.



309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/tina4/queue_backends/lite_backend.rb', line 309

def failed(topic, max_retries: 3)
  dir = topic_path(topic)
  return [] unless Dir.exist?(dir)
  jobs = []
  Dir.glob(File.join(dir, "*.json")).sort_by { |f| File.mtime(f) }.each do |file|
    data = JSON.parse(File.read(file))
    attempts = data["attempts"] || 0
    next unless attempts > 0 && attempts < max_retries
    jobs << data
  rescue JSON::ParserError
    next
  end
  jobs
end

#find_by_id(topic, id) ⇒ Object

Find a specific pending job by id, claim it (delete the file) and return it. Returns nil when no pending job with that id exists.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/tina4/queue_backends/lite_backend.rb', line 95

def find_by_id(topic, id)
  @mutex.synchronize do
    dir = topic_path(topic)
    return nil unless Dir.exist?(dir)

    target = id.to_s
    Dir.glob(File.join(dir, "*.json")).each do |f|
      data = JSON.parse(File.read(f))
      next unless data["id"].to_s == target

      # Reserve (so a dead consumer's job is reclaimable) then claim the
      # pending file — mirrors dequeue.
      write_reserved(data, topic)
      File.delete(f)
      return job_from_data(data, topic)
    rescue JSON::ParserError
      next
    end
    nil
  end
end

#purge(topic, status) ⇒ Object

Delete messages by status. For “failed”/“dead”/“dead_letter”, removes from the dead_letter directory. For “completed”/“pending”, removes matching jobs from the topic (pending) directory. Returns count purged.



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/tina4/queue_backends/lite_backend.rb', line 222

def purge(topic, status)
  count = 0

  if dead_status?(status)
    return 0 unless Dir.exist?(@dead_letter_dir)

    Dir.glob(File.join(@dead_letter_dir, "*.json")).each do |file|
      data = JSON.parse(File.read(file))
      if data["topic"] == topic.to_s
        File.delete(file)
        count += 1
      end
    rescue JSON::ParserError
      next
    end
  else
    dir = topic_path(topic)
    return 0 unless Dir.exist?(dir)

    Dir.glob(File.join(dir, "*.json")).each do |file|
      data = JSON.parse(File.read(file))
      if data["status"].to_s == status.to_s
        File.delete(file)
        count += 1
      end
    rescue JSON::ParserError
      next
    end
  end

  count
end

#requeue(message) ⇒ Object



128
129
130
# File 'lib/tina4/queue_backends/lite_backend.rb', line 128

def requeue(message)
  enqueue(message)
end

#reserved_count(topic) ⇒ Object

Count currently-reserved (in-flight) jobs for a topic.



171
172
173
174
175
# File 'lib/tina4/queue_backends/lite_backend.rb', line 171

def reserved_count(topic)
  dir = reserved_path(topic)
  return 0 unless Dir.exist?(dir)
  Dir.glob(File.join(dir, "*.json")).length
end

#retry(job, delay_seconds: 0) ⇒ Object

Explicit re-queue requested by the caller (job.retry()). Always re-enqueues regardless of the retry limit — a manual override, distinct from the automatic fail() path. Increments attempts, clears the error.



151
152
153
154
155
# File 'lib/tina4/queue_backends/lite_backend.rb', line 151

def retry(job, delay_seconds: 0)
  clear_reservation(job.topic, job.id)
  job.attempts += 1
  requeue_job(job, delay_seconds: delay_seconds, error: nil)
end

#retry_failed(topic, max_retries: 3) ⇒ Object

Revive dead-letter jobs (under max_retries) back to pending. Returns the number of jobs re-queued.



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/tina4/queue_backends/lite_backend.rb', line 257

def retry_failed(topic, max_retries: 3)
  return 0 unless Dir.exist?(@dead_letter_dir)

  dir = topic_path(topic)
  FileUtils.mkdir_p(dir)
  count = 0

  Dir.glob(File.join(@dead_letter_dir, "*.json")).each do |file|
    data = JSON.parse(File.read(file))
    next unless data["topic"] == topic.to_s
    next if (data["attempts"] || 0) >= max_retries

    msg = Tina4::Job.new(
      topic: data["topic"],
      payload: data["payload"],
      id: data["id"],
      priority: data["priority"] || 0,
      attempts: data["attempts"] || 0,
      error: data["error"]
    )
    enqueue(msg)
    File.delete(file)
    count += 1
  rescue JSON::ParserError
    next
  end

  count
end

#retry_job(topic, job_id: nil, delay_seconds: 0) ⇒ Object

Revive a specific dead-letter job by id back to the pending queue. When job_id is nil, revives every dead-letter for the topic.

This is a manual override (Queue#retry(job_id)) — it always revives a dead-letter regardless of attempt count, mirroring job.retry. Returns true if any job was re-queued.



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/tina4/queue_backends/lite_backend.rb', line 330

def retry_job(topic, job_id: nil, delay_seconds: 0)
  return false unless Dir.exist?(@dead_letter_dir)

  available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil
  count = 0

  Dir.glob(File.join(@dead_letter_dir, "*.json")).each do |file|
    data = JSON.parse(File.read(file))
    next unless data["topic"] == topic.to_s
    next if job_id && data["id"] != job_id.to_s

    msg = Tina4::Job.new(
      topic: data["topic"],
      payload: data["payload"],
      id: data["id"],
      priority: data["priority"] || 0,
      attempts: (data["attempts"] || 0) + 1,
      available_at: available_at,
      error: nil
    )
    enqueue(msg)
    File.delete(file)
    count += 1
    break if job_id  # found the specific job, stop scanning
  rescue JSON::ParserError
    next
  end

  count > 0
end

#size(topic) ⇒ Object



164
165
166
167
168
# File 'lib/tina4/queue_backends/lite_backend.rb', line 164

def size(topic)
  dir = topic_path(topic)
  return 0 unless Dir.exist?(dir)
  Dir.glob(File.join(dir, "*.json")).length
end

#topicsObject



191
192
193
194
195
196
# File 'lib/tina4/queue_backends/lite_backend.rb', line 191

def topics
  return [] unless Dir.exist?(@dir)
  Dir.children(@dir)
     .reject { |d| d == "dead_letter" }
     .select { |d| File.directory?(File.join(@dir, d)) }
end