Class: Tina4::QueueBackends::LiteBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/queue_backends/lite_backend.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ LiteBackend

Returns a new instance of LiteBackend.



8
9
10
11
12
13
14
# File 'lib/tina4/queue_backends/lite_backend.rb', line 8

def initialize(options = {})
  @dir = options[:dir] || File.join(Dir.pwd, ".queue")
  @dead_letter_dir = File.join(@dir, "dead_letter")
  FileUtils.mkdir_p(@dir)
  FileUtils.mkdir_p(@dead_letter_dir)
  @mutex = Mutex.new
end

Instance Method Details

#acknowledge(message) ⇒ Object



45
46
47
# File 'lib/tina4/queue_backends/lite_backend.rb', line 45

def acknowledge(message)
  # File already deleted on dequeue
end

#dead_letter(message) ⇒ Object



53
54
55
56
# File 'lib/tina4/queue_backends/lite_backend.rb', line 53

def dead_letter(message)
  path = File.join(@dead_letter_dir, "#{message.id}.json")
  File.write(path, message.to_json)
end

#dequeue(topic) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/tina4/queue_backends/lite_backend.rb', line 25

def dequeue(topic)
  @mutex.synchronize do
    dir = topic_path(topic)
    return nil unless Dir.exist?(dir)

    files = Dir.glob(File.join(dir, "*.json")).sort_by { |f| File.mtime(f) }
    return nil if files.empty?

    file = files.first
    data = JSON.parse(File.read(file))
    File.delete(file)

    Tina4::QueueMessage.new(
      topic: data["topic"],
      payload: data["payload"],
      id: data["id"]
    )
  end
end

#enqueue(message) ⇒ Object



16
17
18
19
20
21
22
23
# File 'lib/tina4/queue_backends/lite_backend.rb', line 16

def enqueue(message)
  @mutex.synchronize do
    topic_dir = topic_path(message.topic)
    FileUtils.mkdir_p(topic_dir)
    path = File.join(topic_dir, "#{message.id}.json")
    File.write(path, message.to_json)
  end
end

#requeue(message) ⇒ Object



49
50
51
# File 'lib/tina4/queue_backends/lite_backend.rb', line 49

def requeue(message)
  enqueue(message)
end

#size(topic) ⇒ Object



58
59
60
61
62
# File 'lib/tina4/queue_backends/lite_backend.rb', line 58

def size(topic)
  dir = topic_path(topic)
  return 0 unless Dir.exist?(dir)
  Dir.glob(File.join(dir, "*.json")).length
end

#topicsObject



64
65
66
67
68
69
# File 'lib/tina4/queue_backends/lite_backend.rb', line 64

def topics
  return [] unless Dir.exist?(@dir)
  Dir.children(@dir)
     .reject { |d| d == "dead_letter" }
     .select { |d| File.directory?(File.join(@dir, d)) }
end