Class: Tina4::QueueBackends::MongoBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/queue_backends/mongo_backend.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ MongoBackend

Returns a new instance of MongoBackend.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 6

def initialize(options = {})
  require "mongo"

  uri = options[:uri] || ENV["TINA4_MONGO_URI"]
  host = options[:host] || ENV.fetch("TINA4_MONGO_HOST", "localhost")
  port = (options[:port] || ENV.fetch("TINA4_MONGO_PORT", 27017)).to_i
  username = options[:username] || ENV["TINA4_MONGO_USERNAME"]
  password = options[:password] || ENV["TINA4_MONGO_PASSWORD"]
  db_name = options[:db] || ENV.fetch("TINA4_MONGO_DB", "tina4")
  @collection_name = options[:collection] || ENV.fetch("TINA4_MONGO_COLLECTION", "tina4_queue")

  if uri
    @client = Mongo::Client.new(uri)
  else
    conn_options = { database: db_name }
    conn_options[:user] = username if username
    conn_options[:password] = password if password
    @client = Mongo::Client.new(["#{host}:#{port}"], conn_options)
  end

  @db = @client.database
  create_indexes
rescue LoadError
  raise "MongoDB backend requires the 'mongo' gem. Install with: gem install mongo"
end

Instance Method Details

#acknowledge(message) ⇒ Object



59
60
61
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 59

def acknowledge(message)
  collection.delete_one(_id: message.id)
end

#closeObject



106
107
108
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 106

def close
  @client&.close
end

#dead_letter(message) ⇒ Object



71
72
73
74
75
76
77
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 71

def dead_letter(message)
  collection.find_one_and_update(
    { _id: message.id },
    { "$set" => { status: "dead", topic: "#{message.topic}.dead_letter" } },
    upsert: true
  )
end

#dead_letters(topic, max_retries: 3) ⇒ Object



83
84
85
86
87
88
89
90
91
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 83

def dead_letters(topic, max_retries: 3)
  collection.find(topic: "#{topic}.dead_letter", status: "dead").map do |doc|
    Tina4::Job.new(
      topic: doc["topic"],
      payload: doc["payload"],
      id: doc["_id"]
    )
  end
end

#dequeue(topic) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 43

def dequeue(topic)
  doc = collection.find_one_and_update(
    { topic: topic, status: "pending" },
    { "$set" => { status: "processing" } },
    sort: { created_at: 1 },
    return_document: :after
  )
  return nil unless doc

  Tina4::Job.new(
    topic: doc["topic"],
    payload: doc["payload"],
    id: doc["_id"]
  )
end

#enqueue(message) ⇒ Object



32
33
34
35
36
37
38
39
40
41
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 32

def enqueue(message)
  collection.insert_one(
    _id: message.id,
    topic: message.topic,
    payload: message.payload,
    created_at: message.created_at.utc,
    attempts: message.attempts,
    status: "pending"
  )
end

#purge(topic, status) ⇒ Object



93
94
95
96
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 93

def purge(topic, status)
  result = collection.delete_many(topic: topic, status: status.to_s)
  result.deleted_count
end

#requeue(message) ⇒ Object



63
64
65
66
67
68
69
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 63

def requeue(message)
  collection.find_one_and_update(
    { _id: message.id },
    { "$set" => { status: "pending" }, "$inc" => { attempts: 1 } },
    upsert: true
  )
end

#retry_failed(topic, max_retries: 3) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 98

def retry_failed(topic, max_retries: 3)
  result = collection.update_many(
    { topic: topic, status: "failed", attempts: { "$lt" => max_retries } },
    { "$set" => { status: "pending" } }
  )
  result.modified_count
end

#size(topic) ⇒ Object



79
80
81
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 79

def size(topic)
  collection.count_documents(topic: topic, status: "pending")
end