Class: Takagi::Message::DeduplicationCache

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/message/deduplication_cache.rb

Overview

Implements message deduplication as per RFC 7252 Section 4.4

The server MUST detect duplicates by matching both Message ID and source endpoint. When a duplicate CON message is received, the server MUST resend the cached response.

Cache entries expire after EXCHANGE_LIFETIME (247 seconds per RFC 7252 §4.8.2)

Defined Under Namespace

Classes: CacheEntry

Constant Summary collapse

EXCHANGE_LIFETIME =

RFC 7252 §4.8.2: EXCHANGE_LIFETIME = 247 seconds

247

Instance Method Summary collapse

Constructor Details

#initializeDeduplicationCache

Returns a new instance of DeduplicationCache.



22
23
24
25
# File 'lib/takagi/message/deduplication_cache.rb', line 22

def initialize
  @cache = {}
  @mutex = Mutex.new
end

Instance Method Details

#check_duplicate(message_id, source_endpoint) ⇒ String?

Check if message is a duplicate and return cached response if available

Parameters:

  • message_id (Integer)

    The CoAP Message ID

  • source_endpoint (String)

    Source IP:Port identifier

Returns:

  • (String, nil)

    Cached response data or nil if not a duplicate



31
32
33
34
35
36
37
38
# File 'lib/takagi/message/deduplication_cache.rb', line 31

def check_duplicate(message_id, source_endpoint)
  @mutex.synchronize do
    cleanup_expired_entries
    key = cache_key(message_id, source_endpoint)
    entry = @cache[key]
    entry&.response_data
  end
end

#clearObject

Clear all cache entries (useful for testing)



56
57
58
# File 'lib/takagi/message/deduplication_cache.rb', line 56

def clear
  @mutex.synchronize { @cache.clear }
end

#statsObject

Get cache statistics



61
62
63
64
65
66
67
68
# File 'lib/takagi/message/deduplication_cache.rb', line 61

def stats
  @mutex.synchronize do
    {
      size: @cache.size,
      entries: @cache.keys
    }
  end
end

#store_response(message_id, source_endpoint, response_data) ⇒ Object

Store a response for future duplicate detection

Parameters:

  • message_id (Integer)

    The CoAP Message ID

  • source_endpoint (String)

    Source IP:Port identifier

  • response_data (String)

    The serialized response to cache



44
45
46
47
48
49
50
51
52
53
# File 'lib/takagi/message/deduplication_cache.rb', line 44

def store_response(message_id, source_endpoint, response_data)
  @mutex.synchronize do
    key = cache_key(message_id, source_endpoint)
    @cache[key] = CacheEntry.new(
      response_data,
      Time.now.to_f,
      source_endpoint
    )
  end
end