Class: Lepus::Producers::Middlewares::Unique

Inherits:
Middleware
  • Object
show all
Defined in:
lib/lepus/producers/middlewares/unique.rb

Overview

A middleware that prevents duplicate messages from being published using the de-dupe gem for Redis-based distributed locking.

When a lock is acquired, the middleware adds x-dedupe-lock-key and x-dedupe-lock-id headers to the message so that a consumer middleware can release the lock after successful processing.

Examples:

class StoryCreatedProducer < Lepus::Producer
  configure(exchange: "story_created")
  use :unique, lock_key: "story", lock_id: ->(msg) { msg.payload[:story_id].to_s }
end

Constant Summary collapse

HEADER_LOCK_KEY =
"x-dedupe-lock-key"
HEADER_LOCK_ID =
"x-dedupe-lock-id"
HEADER_LOCK_TTL =
"x-dedupe-lock-ttl"

Instance Method Summary collapse

Constructor Details

#initialize(lock_key:, lock_id:, ttl: nil) ⇒ Unique

Returns a new instance of Unique.

Parameters:

  • lock_key (String)

    Shared lock namespace (e.g., “story”).

  • lock_id (Proc)

    Callable that extracts a unique ID from the message.

  • ttl (Integer, nil) (defaults to: nil)

    Lock TTL in seconds. Defaults to DeDupe configuration.



26
27
28
29
30
31
# File 'lib/lepus/producers/middlewares/unique.rb', line 26

def initialize(lock_key:, lock_id:, ttl: nil)
  super()
  @lock_key = lock_key
  @lock_id = lock_id
  @ttl = ttl
end

Instance Method Details

#call(message, app) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/lepus/producers/middlewares/unique.rb', line 33

def call(message, app)
  id = @lock_id.call(message)
  return app.call(message) if id.nil?

  lock_opts = {}
  lock_opts[:ttl] = @ttl if @ttl
  lock = DeDupe::Lock.new(lock_key: @lock_key, lock_id: id.to_s, **lock_opts)
  return unless lock.acquire

  message = add_dedupe_headers(message, id)
  app.call(message)
end