Class: SharedBroker::Adapters::RabbitMQ
- Defined in:
- lib/shared_broker/adapters/rabbit_mq.rb
Constant Summary collapse
- EXCHANGE_NAME =
"shared_broker_events"
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(amqp_url:) ⇒ RabbitMQ
constructor
A new instance of RabbitMQ.
- #publish(topic, message, correlation_id: nil) ⇒ Object
- #subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
Constructor Details
#initialize(amqp_url:) ⇒ RabbitMQ
Returns a new instance of RabbitMQ.
13 14 15 16 17 18 |
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 13 def initialize(amqp_url:) @connection = Bunny.new(amqp_url) @connection.start @channel = @connection.create_channel @exchange = @channel.topic(EXCHANGE_NAME, durable: true) end |
Instance Method Details
#close ⇒ Object
57 58 59 60 |
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 57 def close @channel.close if @channel @connection.close if @connection end |
#publish(topic, message, correlation_id: nil) ⇒ Object
20 21 22 23 24 25 26 27 28 |
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 20 def publish(topic, , correlation_id: nil) unless .is_a?(Hash) raise ArgumentError, "Message must be a Hash, got #{.class} with value #{.inspect}" end = { routing_key: topic } [:correlation_id] = correlation_id if correlation_id @exchange.publish(.to_json, ) end |
#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/shared_broker/adapters/rabbit_mq.rb', line 30 def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) queue = @channel.queue(queue_name, durable: true) queue.bind(@exchange, routing_key: topic) queue.subscribe(manual_ack: true) do |delivery_info, , payload| data = JSON.parse(payload, symbolize_names: true) if .respond_to?(:correlation_id) && .correlation_id data[:_correlation_id] = .correlation_id end attempts = 0 begin block.call(data) @channel.acknowledge(delivery_info.delivery_tag, false) rescue => e attempts += 1 if attempts <= max_retries sleep(backoff_base**attempts) retry else publish_to_dlq(queue_name, payload, delivery_info, , e) @channel.acknowledge(delivery_info.delivery_tag, false) end end end end |