Class: ZZQ::Routing::TopicTrie
- Inherits:
-
Object
- Object
- ZZQ::Routing::TopicTrie
- Defined in:
- lib/zzq/routing/topic_trie.rb
Overview
Per-level topic-filter trie. Each node holds literal children, one wildcard (“”) child, and one hash-wildcard (“#”) child. Subscriptions are stored at the terminal node of their filter.
Matches are collected by walking a topic down the trie, traversing literal + ‘+’ branches in parallel and collecting ‘#’ subtree entries at every step. Honours the $SYS-vs-bare-wildcard rule (MQTT-4.7.2-1): filters starting with ‘+` or `#` at the root do not match topics starting with `$SYS`.
Shared-subscription filters (“$share/<group>/<filter>”) are stored with their group stripped off but tagged with the group name on each SubscriberEntry; fan-out chooses exactly one entry per group (MVP: round-robin).
Not thread-safe — a broker-owned trie is mutated from the broker fibers only.
Defined Under Namespace
Classes: Node, SubscriberEntry
Instance Method Summary collapse
-
#add(filter, session, entry) ⇒ Object
Insert a subscription.
- #empty? ⇒ Boolean
-
#initialize ⇒ TopicTrie
constructor
A new instance of TopicTrie.
-
#match(topic) ⇒ Object
Return the matching subscriber entries for
topic. -
#remove(filter, session) ⇒ Object
Remove the entry for (session, filter).
-
#remove_session(session) ⇒ Object
Drop every entry belonging to
session, pruning empty nodes.
Constructor Details
Instance Method Details
#add(filter, session, entry) ⇒ Object
Insert a subscription. filter may be a plain MQTT filter (“a/+/c”, “#”, “$SYS/b/+”) or a shared-subscription filter (“$share/group/a/+/c”).
60 61 62 63 64 |
# File 'lib/zzq/routing/topic_trie.rb', line 60 def add(filter, session, entry) real_filter, group = strip_share(filter) node, _segs = walk_or_create(real_filter) node.entries << SubscriberEntry.new(session: session, entry: entry, share_group: group) end |
#empty? ⇒ Boolean
109 |
# File 'lib/zzq/routing/topic_trie.rb', line 109 def empty? = @root.empty? |
#match(topic) ⇒ Object
Return the matching subscriber entries for topic. One entry per (session, filter); shared-group collapse is performed here so fan-out sees exactly one delivery per $share group.
101 102 103 104 105 106 |
# File 'lib/zzq/routing/topic_trie.rb', line 101 def match(topic) segs = topic.split("/", -1) collected = [] traverse(@root, segs, 0, topic.start_with?("$"), collected) collapse_shared(collected) end |
#remove(filter, session) ⇒ Object
Remove the entry for (session, filter). Returns true if found.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/zzq/routing/topic_trie.rb', line 68 def remove(filter, session) real_filter, _group = strip_share(filter) segs = real_filter.split("/", -1) path = [@root] segs.each do |seg| node = path.last nxt = case seg when "+" then node.plus_child when "#" then node.hash_child else node.children[seg] end return false unless nxt path << nxt end leaf = path.last before = leaf.entries.size leaf.entries.reject! { |e| e.session.equal?(session) && e.entry[:filter] == filter } return false if leaf.entries.size == before prune(path, segs) true end |
#remove_session(session) ⇒ Object
Drop every entry belonging to session, pruning empty nodes.
93 94 95 |
# File 'lib/zzq/routing/topic_trie.rb', line 93 def remove_session(session) walk_prune(@root, []) { |e| e.session.equal?(session) } end |