Module: ActionSubscriber::MessageRetry

Defined in:
lib/action_subscriber/message_retry.rb

Constant Summary collapse

SCHEDULE =
{
  2  =>        100,
  3  =>        500,
  4  =>      2_500,
  5  =>     12_500,
  6  =>     62_500,
  7  =>    312_500,
  8  =>  1_562_500,
  9  =>  7_812_500,
  10 => 39_062_500,
}.freeze

Class Method Summary collapse

Class Method Details

.get_last_attempt_number(env) ⇒ Object

Private Implementation



26
27
28
29
# File 'lib/action_subscriber/message_retry.rb', line 26

def self.get_last_attempt_number(env)
  attempt_header = env.headers.fetch("as-attempt", "1")
  attempt_header.to_i
end

.redeliver_message_with_backoff(env, backoff_schedule = SCHEDULE) ⇒ Object



15
16
17
18
19
20
21
22
23
# File 'lib/action_subscriber/message_retry.rb', line 15

def self.redeliver_message_with_backoff(env, backoff_schedule = SCHEDULE)
  next_attempt = get_last_attempt_number(env) + 1
  ttl = backoff_schedule[next_attempt]
  return unless ttl
  retry_queue_name = "#{env.queue}.retry_#{ttl}"
  with_exchange(env, ttl, retry_queue_name) do |exchange|
    exchange.publish(env.encoded_payload, retry_options(env, next_attempt, retry_queue_name))
  end
end

.retry_headers(env, attempt) ⇒ Object



31
32
33
34
35
36
37
38
# File 'lib/action_subscriber/message_retry.rb', line 31

def self.retry_headers(env, attempt)
  env.headers.reject do |key, val|
    key == "x-death"
  end.merge({
    "as-attempt" => attempt.to_s,
    "x-dead-letter-routing-key" => env.queue,
  })
end

.retry_options(env, attempt, retry_queue_name) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/action_subscriber/message_retry.rb', line 40

def self.retry_options(env, attempt, retry_queue_name)
  {
    :content_type => env.content_type,
    :routing_key => retry_queue_name,
    :headers => retry_headers(env, attempt),
  }
end

.with_exchange(env, ttl, retry_queue_name) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/action_subscriber/message_retry.rb', line 48

def self.with_exchange(env, ttl, retry_queue_name)
  channel = env.channel
  begin
    channel.confirm_select
    # an empty string is the default exchange [see bunny docs](http://rubybunny.info/articles/exchanges.html#default_exchange)
    exchange = channel.topic("")
    queue = channel.queue(retry_queue_name, :arguments => {"x-dead-letter-exchange" => "", "x-message-ttl" => ttl, "x-dead-letter-routing-key" => env.queue})
    yield(exchange)
    channel.wait_for_confirms
  end
end