Class: SharedBroker::Adapters::RabbitMQ
- Defined in:
- lib/shared_broker/adapters/rabbit_mq.rb
Constant Summary collapse
- EXCHANGE_NAME =
"shared_broker_events"
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(amqp_url:) ⇒ RabbitMQ
constructor
A new instance of RabbitMQ.
- #publish(topic, message, correlation_id: nil) ⇒ Object
- #redrive_dlq(dlq_name, original_topic, limit: nil) ⇒ Object
- #subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
Methods inherited from Base
Constructor Details
#initialize(amqp_url:) ⇒ RabbitMQ
Returns a new instance of RabbitMQ.
13 14 15 16 17 18 |
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 13 def initialize(amqp_url:) @connection = Bunny.new(amqp_url) @connection.start @channel = @connection.create_channel @exchange = @channel.topic(EXCHANGE_NAME, durable: true) end |
Instance Method Details
#close ⇒ Object
77 78 79 80 |
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 77 def close @channel.close if @channel @connection.close if @connection end |
#publish(topic, message, correlation_id: nil) ⇒ Object
20 21 22 23 24 25 26 27 28 |
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 20 def publish(topic, , correlation_id: nil) unless .is_a?(Hash) raise ArgumentError, "Message must be a Hash, got #{.class} with value #{.inspect}" end = { routing_key: topic } [:correlation_id] = correlation_id if correlation_id @exchange.publish(.to_json, ) end |
#redrive_dlq(dlq_name, original_topic, limit: nil) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 59 def redrive_dlq(dlq_name, original_topic, limit: nil) dlq_queue = @channel.queue(dlq_name, durable: true) count = 0 loop do break if limit && count >= limit delivery_info, , payload = dlq_queue.pop(manual_ack: true) break unless delivery_info = { routing_key: original_topic } [:correlation_id] = .correlation_id if &.correlation_id @exchange.publish(payload, ) @channel.acknowledge(delivery_info.delivery_tag, false) count += 1 end end |
#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 30 def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) queue = @channel.queue(queue_name, durable: true) queue.bind(@exchange, routing_key: topic) queue.subscribe(manual_ack: true) do |delivery_info, , payload| data = JSON.parse(payload, symbolize_names: true) if .respond_to?(:correlation_id) && .correlation_id data[:_correlation_id] = .correlation_id end attempts = 0 begin block.call(data) @channel.acknowledge(delivery_info.delivery_tag, false) rescue SharedBroker::ShutdownError # Do not acknowledge or send to DLQ. Let RabbitMQ re-queue the message. rescue => e attempts += 1 if attempts <= max_retries sleep(backoff_base**attempts) retry else publish_to_dlq(queue_name, payload, delivery_info, , e) @channel.acknowledge(delivery_info.delivery_tag, false) end end end end |