Class: EventPeople::Broker::Rabbit::RabbitContext

Inherits:
Context
  • Object
show all
Defined in:
lib/event_people/broker/rabbit/rabbit_context.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Context

#fail!, #reject!, #success!

Constructor Details

#initialize(channel, delivery_info, retry_count: 0, max_retries: nil, delay_strategy: nil, dlq_name: nil, queue_name: nil, original_payload: nil) ⇒ RabbitContext

Returns a new instance of RabbitContext.



6
7
8
9
10
11
12
13
14
15
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 6

def initialize(channel, delivery_info, retry_count: 0, max_retries: nil, delay_strategy: nil, dlq_name: nil, queue_name: nil, original_payload: nil)
  @channel          = channel
  @delivery_info    = delivery_info
  @retry_count      = retry_count.to_i
  @max_retries      = (max_retries || EventPeople::Config::MAX_ATTEMPTS).to_i
  @delay_strategy   = delay_strategy || EventPeople::Config::DELAY_STRATEGY
  @dlq_name         = dlq_name || EventPeople::Config::DLQ_NAME
  @queue_name       = queue_name
  @original_payload = original_payload
end

Instance Attribute Details

#dlq_nameObject (readonly)

Returns the value of attribute dlq_name.



4
5
6
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 4

def dlq_name
  @dlq_name
end

#max_retriesObject (readonly)

Returns the value of attribute max_retries.



4
5
6
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 4

def max_retries
  @max_retries
end

Instance Method Details

#failObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 25

def fail
  retry_manager = Rabbit::RetryManager.new(@max_retries, @delay_strategy)

  if retry_manager.should_retry?(@retry_count)
    delay = retry_manager.get_next_delay(@retry_count)
    begin
      @channel.default_exchange.publish(
        @original_payload,
        routing_key: "#{@queue_name}_retry",
        expiration: delay.to_s,
        headers: { 'x-event-people-retries' => @retry_count + 1 }
      )
      @channel.ack(@delivery_info.delivery_tag, false)
    rescue => e
      # If publish+ack fails, nack so the message is redelivered from the main queue.
      # This risks duplication if publish succeeded but ack failed, which is an inherent
      # AMQP at-least-once limitation. We prefer redelivery over silent loss.
      begin
        @channel.nack(@delivery_info.delivery_tag, false, true)
      rescue
        # Channel may already be closed; nothing we can do.
      end
      raise e
    end
  else
    @channel.nack(@delivery_info.delivery_tag, false, false)
  end
end

#is_last_retryObject



17
18
19
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 17

def is_last_retry
  @retry_count >= @max_retries - 1
end

#rejectObject



54
55
56
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 54

def reject
  @channel.reject(@delivery_info.delivery_tag, false)
end

#successObject



21
22
23
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 21

def success
  @channel.ack(@delivery_info.delivery_tag, false)
end