Class: Tina4::QueueBackends::LiteBackend
- Inherits:
-
Object
- Object
- Tina4::QueueBackends::LiteBackend
- Defined in:
- lib/tina4/queue_backends/lite_backend.rb
Instance Method Summary collapse
- #acknowledge(message) ⇒ Object
-
#clear(topic) ⇒ Object
Remove all pending jobs from a topic.
- #dead_letter(message) ⇒ Object
-
#dead_letter_count(topic) ⇒ Object
Count dead-letter / failed messages for a topic.
-
#dead_letters(topic, max_retries: 3) ⇒ Object
Get dead letter jobs for a topic — messages that exceeded max retries.
- #dequeue(topic) ⇒ Object
- #enqueue(message) ⇒ Object
-
#failed(topic, max_retries: 3) ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
-
#initialize(options = {}) ⇒ LiteBackend
constructor
A new instance of LiteBackend.
-
#purge(topic, status) ⇒ Object
Delete messages by status (completed, failed, dead).
- #requeue(message) ⇒ Object
-
#retry_failed(topic, max_retries: 3) ⇒ Object
Re-queue failed messages (under max_retries) back to pending.
-
#retry_job(topic, delay_seconds: 0) ⇒ Object
Retry all dead letter jobs for this topic.
- #size(topic) ⇒ Object
- #topics ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ LiteBackend
Returns a new instance of LiteBackend.
9 10 11 12 13 14 15 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 9 def initialize( = {}) @dir = [:dir] || File.join(Dir.pwd, ".queue") @dead_letter_dir = File.join(@dir, "dead_letter") FileUtils.mkdir_p(@dir) FileUtils.mkdir_p(@dead_letter_dir) @mutex = Mutex.new end |
Instance Method Details
#acknowledge(message) ⇒ Object
66 67 68 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 66 def acknowledge() # File already deleted on dequeue end |
#clear(topic) ⇒ Object
Remove all pending jobs from a topic. Returns count removed.
195 196 197 198 199 200 201 202 203 204 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 195 def clear(topic) dir = topic_path(topic) return 0 unless Dir.exist?(dir) count = 0 Dir.glob(File.join(dir, "*.json")).each do |file| File.delete(file) count += 1 end count end |
#dead_letter(message) ⇒ Object
74 75 76 77 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 74 def dead_letter() path = File.join(@dead_letter_dir, "#{.id}.json") File.write(path, .to_json) end |
#dead_letter_count(topic) ⇒ Object
Count dead-letter / failed messages for a topic.
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 86 def dead_letter_count(topic) return 0 unless Dir.exist?(@dead_letter_dir) count = 0 Dir.glob(File.join(@dead_letter_dir, "*.json")).each do |file| data = JSON.parse(File.read(file)) count += 1 if data["topic"] == topic.to_s rescue JSON::ParserError next end count end |
#dead_letters(topic, max_retries: 3) ⇒ Object
Get dead letter jobs for a topic — messages that exceeded max retries.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 107 def dead_letters(topic, max_retries: 3) return [] unless Dir.exist?(@dead_letter_dir) files = Dir.glob(File.join(@dead_letter_dir, "*.json")).sort_by { |f| File.mtime(f) } jobs = [] files.each do |file| data = JSON.parse(File.read(file)) next unless data["topic"] == topic.to_s data["status"] = "dead" jobs << data rescue JSON::ParserError next end jobs end |
#dequeue(topic) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 26 def dequeue(topic) @mutex.synchronize do dir = topic_path(topic) return nil unless Dir.exist?(dir) now = Time.now candidates = [] Dir.glob(File.join(dir, "*.json")).each do |f| data = JSON.parse(File.read(f)) # Skip messages that are not yet available (delayed) if data["available_at"] available_at = Time.parse(data["available_at"]) next if available_at > now end candidates << { file: f, data: data, priority: data["priority"] || 0, mtime: File.mtime(f) } rescue JSON::ParserError next end return nil if candidates.empty? # Sort by priority descending (higher first), then by mtime ascending (oldest first) candidates.sort_by! { |c| [-c[:priority], c[:mtime]] } chosen = candidates.first File.delete(chosen[:file]) data = chosen[:data] Tina4::Job.new( topic: data["topic"] || topic.to_s, payload: data["payload"], id: data["id"], priority: data["priority"] || 0, available_at: data["available_at"] ? Time.parse(data["available_at"]) : nil, attempts: data["attempts"] || 0 ) end end |
#enqueue(message) ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 17 def enqueue() @mutex.synchronize do topic_dir = topic_path(.topic) FileUtils.mkdir_p(topic_dir) path = File.join(topic_dir, "#{.id}.json") File.write(path, .to_json) end end |
#failed(topic, max_retries: 3) ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 207 def failed(topic, max_retries: 3) return [] unless Dir.exist?(@dead_letter_dir) jobs = [] Dir.glob(File.join(@dead_letter_dir, "*.json")).sort_by { |f| File.mtime(f) }.each do |file| data = JSON.parse(File.read(file)) next unless data["topic"] == topic.to_s next if (data["attempts"] || 0) >= max_retries jobs << data rescue JSON::ParserError next end jobs end |
#purge(topic, status) ⇒ Object
Delete messages by status (completed, failed, dead). For ‘dead’, removes from the dead_letter directory. For ‘failed’, removes from the topic directory (re-queued failed messages). Returns the number of jobs purged.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 129 def purge(topic, status) count = 0 if status.to_s == "dead" return 0 unless Dir.exist?(@dead_letter_dir) Dir.glob(File.join(@dead_letter_dir, "*.json")).each do |file| data = JSON.parse(File.read(file)) if data["topic"] == topic.to_s File.delete(file) count += 1 end rescue JSON::ParserError next end elsif status.to_s == "failed" || status.to_s == "completed" || status.to_s == "pending" dir = topic_path(topic) return 0 unless Dir.exist?(dir) Dir.glob(File.join(dir, "*.json")).each do |file| data = JSON.parse(File.read(file)) if data["status"] == status.to_s File.delete(file) count += 1 end rescue JSON::ParserError next end end count end |
#requeue(message) ⇒ Object
70 71 72 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 70 def requeue() enqueue() end |
#retry_failed(topic, max_retries: 3) ⇒ Object
Re-queue failed messages (under max_retries) back to pending. Returns the number of jobs re-queued.
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 164 def retry_failed(topic, max_retries: 3) return 0 unless Dir.exist?(@dead_letter_dir) dir = topic_path(topic) FileUtils.mkdir_p(dir) count = 0 # Dead letter directory contains messages that the Consumer moved there. # Only retry those whose attempts are under max_retries. Dir.glob(File.join(@dead_letter_dir, "*.json")).each do |file| data = JSON.parse(File.read(file)) next unless data["topic"] == topic.to_s next if (data["attempts"] || 0) >= max_retries data["status"] = "pending" msg = Tina4::Job.new( topic: data["topic"], payload: data["payload"], id: data["id"] ) enqueue(msg) File.delete(file) count += 1 rescue JSON::ParserError next end count end |
#retry_job(topic, delay_seconds: 0) ⇒ Object
Retry all dead letter jobs for this topic. Returns true if any were re-queued.
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 222 def retry_job(topic, delay_seconds: 0) return false unless Dir.exist?(@dead_letter_dir) available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil count = 0 Dir.glob(File.join(@dead_letter_dir, "*.json")).each do |file| data = JSON.parse(File.read(file)) next unless data["topic"] == topic.to_s msg = Tina4::Job.new( topic: data["topic"], payload: data["payload"], id: data["id"], attempts: (data["attempts"] || 0) + 1, available_at: available_at ) enqueue(msg) File.delete(file) count += 1 rescue JSON::ParserError next end count > 0 end |
#size(topic) ⇒ Object
79 80 81 82 83 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 79 def size(topic) dir = topic_path(topic) return 0 unless Dir.exist?(dir) Dir.glob(File.join(dir, "*.json")).length end |
#topics ⇒ Object
99 100 101 102 103 104 |
# File 'lib/tina4/queue_backends/lite_backend.rb', line 99 def topics return [] unless Dir.exist?(@dir) Dir.children(@dir) .reject { |d| d == "dead_letter" } .select { |d| File.directory?(File.join(@dir, d)) } end |