Class: Lepus::Consumers::Middlewares::Unique

Inherits:
Middleware
  • Object
show all
Defined in:
lib/lepus/consumers/middlewares/unique.rb

Overview

A middleware that releases deduplication locks after message processing.

Works in tandem with Lepus::Producers::Middlewares::Unique. The producer acquires a lock and embeds the lock info in message headers. This consumer middleware reads those headers and releases the lock based on the configured release_on conditions.

Examples:

Release on ack (default)

use :unique

Release on ack or reject

use :unique, release_on: [:ack, :reject]

Release on error (e.g., dead-letter scenarios)

use :unique, release_on: [:ack, :error]

Constant Summary collapse

HEADER_LOCK_KEY =
"x-dedupe-lock-key"
HEADER_LOCK_ID =
"x-dedupe-lock-id"
HEADER_LOCK_TTL =
"x-dedupe-lock-ttl"

Instance Method Summary collapse

Constructor Details

#initialize(release_on: [:ack]) ⇒ Unique

Returns a new instance of Unique.

Parameters:

  • release_on (Array<Symbol>) (defaults to: [:ack])

    Conditions that trigger lock release. Valid values: :ack, :reject, :requeue, :nack, :error. Defaults to [:ack].



29
30
31
32
# File 'lib/lepus/consumers/middlewares/unique.rb', line 29

def initialize(release_on: [:ack])
  super()
  @release_on = Array(release_on)
end

Instance Method Details

#call(message, app) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/lepus/consumers/middlewares/unique.rb', line 34

def call(message, app)
  result = app.call(message)

  if @release_on.include?(result)
    release_lock(message)
  end

  result
rescue
  release_lock(message) if @release_on.include?(:error)
  raise
end