Class: ZZQ::Routing::RetainedStore

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/routing/retained_store.rb

Overview

Exact-topic map of retained application messages.

MQTT spec: the last PUBLISH with RETAIN=1 for a given topic is stored. A zero-length payload + RETAIN=1 deletes the retained message for that topic (MQTT-3.3.1-6).

On SUBSCRIBE, the broker walks this store and delivers every retained message whose topic matches the new filter.

Instance Method Summary collapse

Constructor Details

#initialize(persistence: nil) ⇒ RetainedStore

persistence is any Persistence::Interface subclass. Default is in-memory (no durability). On construction the in-process cache is warmed from the backend’s load_retained.



19
20
21
22
23
# File 'lib/zzq/routing/retained_store.rb', line 19

def initialize(persistence: nil)
  @store       = {}  # topic => ZZQ::Message
  @persistence = persistence
  @persistence&.load_retained { |msg| @store[msg.topic] = msg }
end

Instance Method Details

#clearObject

Test helper: wipe everything.



61
62
63
64
# File 'lib/zzq/routing/retained_store.rb', line 61

def clear
  @persistence&.clear_retained
  @store.clear
end

#delete(topic) ⇒ Object



41
42
43
44
# File 'lib/zzq/routing/retained_store.rb', line 41

def delete(topic)
  @persistence&.delete_retained(topic)
  @store.delete(topic)
end

#each_matching(filter) ⇒ Object

Enumerate retained messages matching filter.



48
49
50
51
52
53
# File 'lib/zzq/routing/retained_store.rb', line 48

def each_matching(filter)
  return enum_for(:each_matching, filter) unless block_given?
  @store.each_value do |msg|
    yield msg if TopicMatch.match?(filter, msg.topic)
  end
end

#empty?Boolean

Returns:

  • (Boolean)


57
# File 'lib/zzq/routing/retained_store.rb', line 57

def empty? = @store.empty?

#sizeObject



56
# File 'lib/zzq/routing/retained_store.rb', line 56

def size = @store.size

#store(message) ⇒ Object

Upsert or delete per RETAIN semantics. Returns the stored message (nil if the empty-payload rule deleted it).



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/zzq/routing/retained_store.rb', line 28

def store(message)
  if message.payload.empty?
    @store.delete(message.topic)
    @persistence&.delete_retained(message.topic)
    nil
  else
    @store[message.topic] = message
    @persistence&.save_retained(message)
    message
  end
end