Class: BBK::AMQP::RejectionPolicies::Republish

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/amqp/rejection_policies/republish.rb

Constant Summary collapse

REPUBLISH_COUNTER_KEY =
'x-republish-count'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(publisher, logger: BBK::Utils::Logger.default) ⇒ Republish

Returns a new instance of Republish.



10
11
12
13
# File 'lib/bbk/amqp/rejection_policies/republish.rb', line 10

def initialize(publisher, logger: BBK::Utils::Logger.default)
  @publisher = publisher
  @logger = ActiveSupport::TaggedLogging.new(logger).tagged(self.class.name)
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/bbk/amqp/rejection_policies/republish.rb', line 8

def logger
  @logger
end

Instance Method Details

#call(message, error, *_args, **_kwargs) ⇒ Object



15
16
17
18
19
20
21
# File 'lib/bbk/amqp/rejection_policies/republish.rb', line 15

def call(message, error, *_args, **_kwargs)
  if message.delivery_info[:redelivered] || message.headers.key?(REPUBLISH_COUNTER_KEY)
    republish_message(message, error)
  else
    requeue_message(message, error)
  end
end

#republish_message(message, error) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/bbk/amqp/rejection_policies/republish.rb', line 28

def republish_message(message, error)
  logger.warn "Republish message #{message.headers[:type]}[#{message.headers[:message_id]}]. Error: #{error.inspect}"
  msg = message.clone
  msg.headers[REPUBLISH_COUNTER_KEY] = msg.headers.fetch(REPUBLISH_COUNTER_KEY, 0).to_i + 1
  @publisher.publish_message(msg.delivery_info[:queue], msg, exchange: '').value!
  message.ack
end

#requeue_message(message, error) ⇒ Object



23
24
25
26
# File 'lib/bbk/amqp/rejection_policies/republish.rb', line 23

def requeue_message(message, error)
  logger.warn "Requeue message #{message.headers[:type]}[#{message.headers[:message_id]}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}. Error: #{error.inspect}"
  message.delivery_info[:channel].reject message.delivery_info[:delivery_tag], true
end