Class: EventPeople::Broker::Rabbit::RabbitContext

Inherits:
Context
  • Object
show all
Defined in:
lib/event_people/broker/rabbit/rabbit_context.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Context

#fail!, #reject!, #success!

Constructor Details

#initialize(channel, delivery_info, retry_count: 0, max_retries: nil, delay_strategy: nil, dlq_name: nil, queue_name: nil, original_payload: nil) ⇒ RabbitContext

Returns a new instance of RabbitContext.



6
7
8
9
10
11
12
13
14
15
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 6

def initialize(channel, delivery_info, retry_count: 0, max_retries: nil, delay_strategy: nil, dlq_name: nil, queue_name: nil, original_payload: nil)
  @channel          = channel
  @delivery_info    = delivery_info
  @retry_count      = retry_count.to_i
  @max_retries      = (max_retries || EventPeople::Config::MAX_ATTEMPTS).to_i
  @delay_strategy   = delay_strategy || EventPeople::Config::DELAY_STRATEGY
  @dlq_name         = dlq_name || EventPeople::Config::DLQ_NAME
  @queue_name       = queue_name
  @original_payload = original_payload
end

Instance Attribute Details

#dlq_nameObject (readonly)

Returns the value of attribute dlq_name.



4
5
6
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 4

def dlq_name
  @dlq_name
end

#max_retriesObject (readonly)

Returns the value of attribute max_retries.



4
5
6
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 4

def max_retries
  @max_retries
end

Instance Method Details

#failObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 25

def fail
  retry_manager = Rabbit::RetryManager.new(@max_retries, @delay_strategy)

  if retry_manager.should_retry?(@retry_count)
    delay = retry_manager.get_next_delay(@retry_count)
    begin
      @channel.default_exchange.publish(
        @original_payload,
        routing_key: "#{@queue_name}_retry",
        expiration: delay.to_s,
        headers: { 'x-event-people-retries' => @retry_count + 1 }
      )
    rescue => e
      # Publish failed — nack without requeue so the DLX routes to DLQ.
      # Requeuing without incrementing x-event-people-retries would cause an infinite loop.
      begin
        @channel.nack(@delivery_info.delivery_tag, false, false)
      rescue
        # Channel may already be closed; nothing we can do.
      end
      raise e
    end
    begin
      @channel.ack(@delivery_info.delivery_tag, false)
    rescue
      # Publish already succeeded; swallow ack errors. The message may be redelivered
      # once (at-least-once), but that is safer than nacking to DLQ when a retry copy
      # is already enqueued.
    end
  else
    @channel.nack(@delivery_info.delivery_tag, false, false)
  end
end

#is_last_retryObject



17
18
19
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 17

def is_last_retry
  @retry_count >= @max_retries - 1
end

#rejectObject



59
60
61
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 59

def reject
  @channel.reject(@delivery_info.delivery_tag, false)
end

#successObject



21
22
23
# File 'lib/event_people/broker/rabbit/rabbit_context.rb', line 21

def success
  @channel.ack(@delivery_info.delivery_tag, false)
end