Class: Tina4::QueueBackends::RabbitmqBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/queue_backends/rabbitmq_backend.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ RabbitmqBackend

Returns a new instance of RabbitmqBackend.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 6

def initialize(options = {})
  require "bunny"
  @connection = Bunny.new(
    host: options[:host] || "localhost",
    port: options[:port] || 5672,
    username: options[:username] || "guest",
    password: options[:password] || "guest",
    vhost: options[:vhost] || "/"
  )
  @connection.start
  @channel = @connection.create_channel
  @queues = {}
  @exchanges = {}
rescue LoadError
  raise "RabbitMQ backend requires the 'bunny' gem. Install with: gem install bunny"
end

Instance Method Details

#acknowledge(_message) ⇒ Object



43
44
45
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 43

def acknowledge(_message)
  @channel.acknowledge(@last_delivery_tag) if @last_delivery_tag
end

#closeObject



61
62
63
64
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 61

def close
  @channel&.close
  @connection&.close
end

#dead_letter(message) ⇒ Object



51
52
53
54
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 51

def dead_letter(message)
  dlq = get_queue("#{message.topic}.dead_letter")
  dlq.publish(message.to_json, persistent: true)
end

#dequeue(topic) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 28

def dequeue(topic)
  queue = get_queue(topic)
  delivery_info, _properties, payload = queue.pop
  return nil unless payload

  data = JSON.parse(payload)
  msg = Tina4::Job.new(
    topic: data["topic"],
    payload: data["payload"],
    id: data["id"]
  )
  @last_delivery_tag = delivery_info.delivery_tag
  msg
end

#enqueue(message) ⇒ Object



23
24
25
26
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 23

def enqueue(message)
  queue = get_queue(message.topic)
  queue.publish(message.to_json, persistent: true)
end

#requeue(message) ⇒ Object



47
48
49
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 47

def requeue(message)
  enqueue(message)
end

#size(topic) ⇒ Object



56
57
58
59
# File 'lib/tina4/queue_backends/rabbitmq_backend.rb', line 56

def size(topic)
  queue = get_queue(topic)
  queue.message_count
end