Class: Tina4::QueueBackends::RabbitmqBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/queue_backends/rabbitmq_backend.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ RabbitmqBackend

Returns a new instance of RabbitmqBackend.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 6

def initialize(options = {})
  require "bunny"
  @connection = Bunny.new(
    host: options[:host] || "localhost",
    port: options[:port] || 5672,
    username: options[:username] || "guest",
    password: options[:password] || "guest",
    vhost: options[:vhost] || "/"
  )
  @connection.start
  @channel = @connection.create_channel
  @queues = {}
  @exchanges = {}
rescue LoadError
  raise "RabbitMQ backend requires the 'bunny' gem. Install with: gem install bunny"
end

Instance Method Details

#closeObject



77
78
79
80
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 77

def close
  @channel&.close
  @connection&.close
end

#complete(_message) ⇒ Object

Acknowledge the in-flight message as done (terminal). Named complete() to match the lite/mongo backends AND the Job#complete lifecycle, which calls backend.complete (not acknowledge) — so ‘job.complete` now actually acks the broker message instead of being a silent no-op. multiple:false acks only this delivery. The stored tag is cleared so a double-complete is a safe no-op rather than a second ack on an unknown tag.



56
57
58
59
60
61
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 56

def complete(_message)
  return unless @last_delivery_tag

  @channel.acknowledge(@last_delivery_tag, false)
  @last_delivery_tag = nil
end

#dead_letter(message) ⇒ Object



67
68
69
70
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 67

def dead_letter(message)
  dlq = get_queue("#{message.topic}.dead_letter")
  dlq.publish(message.to_json, persistent: true)
end

#dequeue(topic) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 28

def dequeue(topic)
  queue = get_queue(topic)
  # Manual ack: do NOT let bunny's default auto-ack remove the message on
  # pop. The message stays in-flight (unacked) until complete() acks it, so
  # a consumer crash before complete() makes the broker redeliver it
  # (at-least-once delivery) — parity with the Python/PHP masters, whose
  # basic_get uses auto_ack=false / no-ack=false. With the old auto-ack pop
  # the stored delivery_tag had already been acked, so a later
  # channel.acknowledge raised PRECONDITION_FAILED and closed the channel.
  delivery_info, _properties, payload = queue.pop(manual_ack: true)
  return nil unless payload

  data = JSON.parse(payload)
  msg = Tina4::Job.new(
    topic: data["topic"],
    payload: data["payload"],
    id: data["id"]
  )
  @last_delivery_tag = delivery_info.delivery_tag
  msg
end

#enqueue(message) ⇒ Object



23
24
25
26
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 23

def enqueue(message)
  queue = get_queue(message.topic)
  queue.publish(message.to_json, persistent: true)
end

#requeue(message) ⇒ Object



63
64
65
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 63

def requeue(message)
  enqueue(message)
end

#size(topic) ⇒ Object



72
73
74
75
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 72

def size(topic)
  queue = get_queue(topic)
  queue.message_count
end