Class: SharedBroker::Adapters::RabbitMQ

Inherits:
Base
  • Object
show all
Defined in:
lib/shared_broker/adapters/rabbit_mq.rb

Constant Summary collapse

EXCHANGE_NAME =
"shared_broker_events"

Instance Method Summary collapse

Methods inherited from Base

#publish_batch

Constructor Details

#initialize(amqp_url:) ⇒ RabbitMQ

Returns a new instance of RabbitMQ.



13
14
15
16
17
18
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 13

def initialize(amqp_url:)
  @connection = Bunny.new(amqp_url)
  @connection.start
  @channel = @connection.create_channel
  @exchange = @channel.topic(EXCHANGE_NAME, durable: true)
end

Instance Method Details

#closeObject



77
78
79
80
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 77

def close
  @channel.close if @channel
  @connection.close if @connection
end

#publish(topic, message, correlation_id: nil) ⇒ Object



20
21
22
23
24
25
26
27
28
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 20

def publish(topic, message, correlation_id: nil)
  unless message.is_a?(Hash)
    raise ArgumentError, "Message must be a Hash, got #{message.class} with value #{message.inspect}"
  end

  options = { routing_key: topic }
  options[:correlation_id] = correlation_id if correlation_id
  @exchange.publish(message.to_json, options)
end

#redrive_dlq(dlq_name, original_topic, limit: nil) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 59

def redrive_dlq(dlq_name, original_topic, limit: nil)
  dlq_queue = @channel.queue(dlq_name, durable: true)
  count = 0

  loop do
    break if limit && count >= limit
    delivery_info, , payload = dlq_queue.pop(manual_ack: true)
    break unless delivery_info

    options = { routing_key: original_topic }
    options[:correlation_id] = .correlation_id if &.correlation_id
    @exchange.publish(payload, options)

    @channel.acknowledge(delivery_info.delivery_tag, false)
    count += 1
  end
end

#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 30

def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block)
  queue = @channel.queue(queue_name, durable: true)
  queue.bind(@exchange, routing_key: topic)
  
  queue.subscribe(manual_ack: true) do |delivery_info, , payload|
    data = JSON.parse(payload, symbolize_names: true)
    if .respond_to?(:correlation_id) && .correlation_id
      data[:_correlation_id] = .correlation_id
    end

    attempts = 0
    begin
      block.call(data)
      @channel.acknowledge(delivery_info.delivery_tag, false)
    rescue SharedBroker::ShutdownError
      # Do not acknowledge or send to DLQ. Let RabbitMQ re-queue the message.
    rescue => e
      attempts += 1
      if attempts <= max_retries
        sleep(backoff_base**attempts)
        retry
      else
        publish_to_dlq(queue_name, payload, delivery_info, , e)
        @channel.acknowledge(delivery_info.delivery_tag, false)
      end
    end
  end
end