Class: ZZQ::Routing::RetainedStore
- Inherits:
-
Object
- Object
- ZZQ::Routing::RetainedStore
- Defined in:
- lib/zzq/routing/retained_store.rb
Overview
Exact-topic map of retained application messages.
MQTT spec: the last PUBLISH with RETAIN=1 for a given topic is stored. A zero-length payload + RETAIN=1 deletes the retained message for that topic (MQTT-3.3.1-6).
On SUBSCRIBE, the broker walks this store and delivers every retained message whose topic matches the new filter.
Instance Method Summary collapse
-
#clear ⇒ Object
Test helper: wipe everything.
- #delete(topic) ⇒ Object
-
#each_matching(filter) ⇒ Object
Enumerate retained messages matching
filter. - #empty? ⇒ Boolean
-
#initialize(persistence: nil) ⇒ RetainedStore
constructor
persistenceis any Persistence::Interface subclass. - #size ⇒ Object
-
#store(message) ⇒ Object
Upsert or delete per RETAIN semantics.
Constructor Details
#initialize(persistence: nil) ⇒ RetainedStore
persistence is any Persistence::Interface subclass. Default is in-memory (no durability). On construction the in-process cache is warmed from the backend’s load_retained.
19 20 21 22 23 |
# File 'lib/zzq/routing/retained_store.rb', line 19 def initialize(persistence: nil) @store = {} # topic => ZZQ::Message @persistence = persistence @persistence&.load_retained { |msg| @store[msg.topic] = msg } end |
Instance Method Details
#clear ⇒ Object
Test helper: wipe everything.
61 62 63 64 |
# File 'lib/zzq/routing/retained_store.rb', line 61 def clear @persistence&.clear_retained @store.clear end |
#delete(topic) ⇒ Object
41 42 43 44 |
# File 'lib/zzq/routing/retained_store.rb', line 41 def delete(topic) @persistence&.delete_retained(topic) @store.delete(topic) end |
#each_matching(filter) ⇒ Object
Enumerate retained messages matching filter.
48 49 50 51 52 53 |
# File 'lib/zzq/routing/retained_store.rb', line 48 def each_matching(filter) return enum_for(:each_matching, filter) unless block_given? @store.each_value do |msg| yield msg if TopicMatch.match?(filter, msg.topic) end end |
#empty? ⇒ Boolean
57 |
# File 'lib/zzq/routing/retained_store.rb', line 57 def empty? = @store.empty? |
#size ⇒ Object
56 |
# File 'lib/zzq/routing/retained_store.rb', line 56 def size = @store.size |
#store(message) ⇒ Object
Upsert or delete per RETAIN semantics. Returns the stored message (nil if the empty-payload rule deleted it).
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/zzq/routing/retained_store.rb', line 28 def store() if .payload.empty? @store.delete(.topic) @persistence&.delete_retained(.topic) nil else @store[.topic] = @persistence&.save_retained() end end |