Class: Tina4::QueueBackends::MongoBackend
- Inherits:
-
Object
- Object
- Tina4::QueueBackends::MongoBackend
- Defined in:
- lib/tina4/queue_backends/mongo_backend.rb
Instance Method Summary collapse
- #acknowledge(message) ⇒ Object
- #close ⇒ Object
- #dead_letter(message) ⇒ Object
- #dead_letters(topic, max_retries: 3) ⇒ Object
- #dequeue(topic) ⇒ Object
- #enqueue(message) ⇒ Object
-
#initialize(options = {}) ⇒ MongoBackend
constructor
A new instance of MongoBackend.
- #purge(topic, status) ⇒ Object
- #requeue(message) ⇒ Object
- #retry_failed(topic, max_retries: 3) ⇒ Object
- #size(topic) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ MongoBackend
Returns a new instance of MongoBackend.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 6 def initialize( = {}) require "mongo" uri = [:uri] || ENV["TINA4_MONGO_URI"] host = [:host] || ENV.fetch("TINA4_MONGO_HOST", "localhost") port = ([:port] || ENV.fetch("TINA4_MONGO_PORT", 27017)).to_i username = [:username] || ENV["TINA4_MONGO_USERNAME"] password = [:password] || ENV["TINA4_MONGO_PASSWORD"] db_name = [:db] || ENV.fetch("TINA4_MONGO_DB", "tina4") @collection_name = [:collection] || ENV.fetch("TINA4_MONGO_COLLECTION", "tina4_queue") if uri @client = Mongo::Client.new(uri) else = { database: db_name } [:user] = username if username [:password] = password if password @client = Mongo::Client.new(["#{host}:#{port}"], ) end @db = @client.database create_indexes rescue LoadError raise "MongoDB backend requires the 'mongo' gem. Install with: gem install mongo" end |
Instance Method Details
#acknowledge(message) ⇒ Object
59 60 61 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 59 def acknowledge() collection.delete_one(_id: .id) end |
#close ⇒ Object
106 107 108 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 106 def close @client&.close end |
#dead_letter(message) ⇒ Object
71 72 73 74 75 76 77 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 71 def dead_letter() collection.find_one_and_update( { _id: .id }, { "$set" => { status: "dead", topic: "#{.topic}.dead_letter" } }, upsert: true ) end |
#dead_letters(topic, max_retries: 3) ⇒ Object
83 84 85 86 87 88 89 90 91 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 83 def dead_letters(topic, max_retries: 3) collection.find(topic: "#{topic}.dead_letter", status: "dead").map do |doc| Tina4::Job.new( topic: doc["topic"], payload: doc["payload"], id: doc["_id"] ) end end |
#dequeue(topic) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 43 def dequeue(topic) doc = collection.find_one_and_update( { topic: topic, status: "pending" }, { "$set" => { status: "processing" } }, sort: { created_at: 1 }, return_document: :after ) return nil unless doc Tina4::Job.new( topic: doc["topic"], payload: doc["payload"], id: doc["_id"] ) end |
#enqueue(message) ⇒ Object
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 32 def enqueue() collection.insert_one( _id: .id, topic: .topic, payload: .payload, created_at: .created_at.utc, attempts: .attempts, status: "pending" ) end |
#purge(topic, status) ⇒ Object
93 94 95 96 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 93 def purge(topic, status) result = collection.delete_many(topic: topic, status: status.to_s) result.deleted_count end |
#requeue(message) ⇒ Object
63 64 65 66 67 68 69 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 63 def requeue() collection.find_one_and_update( { _id: .id }, { "$set" => { status: "pending" }, "$inc" => { attempts: 1 } }, upsert: true ) end |
#retry_failed(topic, max_retries: 3) ⇒ Object
98 99 100 101 102 103 104 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 98 def retry_failed(topic, max_retries: 3) result = collection.update_many( { topic: topic, status: "failed", attempts: { "$lt" => max_retries } }, { "$set" => { status: "pending" } } ) result.modified_count end |
#size(topic) ⇒ Object
79 80 81 |
# File 'lib/tina4/queue_backends/mongo_backend.rb', line 79 def size(topic) collection.count_documents(topic: topic, status: "pending") end |