Class: Tina4::QueueBackends::RabbitmqBackend
- Inherits:
-
Object
- Object
- Tina4::QueueBackends::RabbitmqBackend
- Defined in:
- lib/tina4/queue_backends/rabbitmq_backend.rb
Instance Method Summary collapse
- #close ⇒ Object
-
#complete(_message) ⇒ Object
Acknowledge the in-flight message as done (terminal).
- #dead_letter(message) ⇒ Object
- #dequeue(topic) ⇒ Object
- #enqueue(message) ⇒ Object
-
#initialize(options = {}) ⇒ RabbitmqBackend
constructor
A new instance of RabbitmqBackend.
- #requeue(message) ⇒ Object
- #size(topic) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ RabbitmqBackend
Returns a new instance of RabbitmqBackend.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 6 def initialize( = {}) require "bunny" @connection = Bunny.new( host: [:host] || "localhost", port: [:port] || 5672, username: [:username] || "guest", password: [:password] || "guest", vhost: [:vhost] || "/" ) @connection.start @channel = @connection.create_channel @queues = {} @exchanges = {} rescue LoadError raise "RabbitMQ backend requires the 'bunny' gem. Install with: gem install bunny" end |
Instance Method Details
#close ⇒ Object
77 78 79 80 |
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 77 def close @channel&.close @connection&.close end |
#complete(_message) ⇒ Object
Acknowledge the in-flight message as done (terminal). Named complete() to match the lite/mongo backends AND the Job#complete lifecycle, which calls backend.complete (not acknowledge) — so ‘job.complete` now actually acks the broker message instead of being a silent no-op. multiple:false acks only this delivery. The stored tag is cleared so a double-complete is a safe no-op rather than a second ack on an unknown tag.
56 57 58 59 60 61 |
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 56 def complete() return unless @last_delivery_tag @channel.acknowledge(@last_delivery_tag, false) @last_delivery_tag = nil end |
#dead_letter(message) ⇒ Object
67 68 69 70 |
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 67 def dead_letter() dlq = get_queue("#{.topic}.dead_letter") dlq.publish(.to_json, persistent: true) end |
#dequeue(topic) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 28 def dequeue(topic) queue = get_queue(topic) # Manual ack: do NOT let bunny's default auto-ack remove the message on # pop. The message stays in-flight (unacked) until complete() acks it, so # a consumer crash before complete() makes the broker redeliver it # (at-least-once delivery) — parity with the Python/PHP masters, whose # basic_get uses auto_ack=false / no-ack=false. With the old auto-ack pop # the stored delivery_tag had already been acked, so a later # channel.acknowledge raised PRECONDITION_FAILED and closed the channel. delivery_info, _properties, payload = queue.pop(manual_ack: true) return nil unless payload data = JSON.parse(payload) msg = Tina4::Job.new( topic: data["topic"], payload: data["payload"], id: data["id"] ) @last_delivery_tag = delivery_info.delivery_tag msg end |
#enqueue(message) ⇒ Object
23 24 25 26 |
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 23 def enqueue() queue = get_queue(.topic) queue.publish(.to_json, persistent: true) end |
#requeue(message) ⇒ Object
63 64 65 |
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 63 def requeue() enqueue() end |
#size(topic) ⇒ Object
72 73 74 75 |
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 72 def size(topic) queue = get_queue(topic) queue. end |