Class: ZZQ::Routing::TopicTrie

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/routing/topic_trie.rb

Overview

Per-level topic-filter trie. Each node holds literal children, one wildcard (“”) child, and one hash-wildcard (“#”) child. Subscriptions are stored at the terminal node of their filter.

Matches are collected by walking a topic down the trie, traversing literal + ‘+’ branches in parallel and collecting ‘#’ subtree entries at every step. Honours the $SYS-vs-bare-wildcard rule (MQTT-4.7.2-1): filters starting with ‘+` or `#` at the root do not match topics starting with `$SYS`.

Shared-subscription filters (“$share/<group>/<filter>”) are stored with their group stripped off but tagged with the group name on each SubscriberEntry; fan-out chooses exactly one entry per group (MVP: round-robin).

Not thread-safe — a broker-owned trie is mutated from the broker fibers only.

Defined Under Namespace

Classes: Node, SubscriberEntry

Instance Method Summary collapse

Constructor Details

#initializeTopicTrie

Returns a new instance of TopicTrie.



50
51
52
53
54
# File 'lib/zzq/routing/topic_trie.rb', line 50

def initialize
  @root = Node.new
  # Round-robin cursor for shared groups: group_key => index.
  @share_cursor = Hash.new(0)
end

Instance Method Details

#add(filter, session, entry) ⇒ Object

Insert a subscription. filter may be a plain MQTT filter (“a/+/c”, “#”, “$SYS/b/+”) or a shared-subscription filter (“$share/group/a/+/c”).



60
61
62
63
64
# File 'lib/zzq/routing/topic_trie.rb', line 60

def add(filter, session, entry)
  real_filter, group = strip_share(filter)
  node, _segs = walk_or_create(real_filter)
  node.entries << SubscriberEntry.new(session: session, entry: entry, share_group: group)
end

#empty?Boolean

Returns:

  • (Boolean)


109
# File 'lib/zzq/routing/topic_trie.rb', line 109

def empty? = @root.empty?

#match(topic) ⇒ Object

Return the matching subscriber entries for topic. One entry per (session, filter); shared-group collapse is performed here so fan-out sees exactly one delivery per $share group.



101
102
103
104
105
106
# File 'lib/zzq/routing/topic_trie.rb', line 101

def match(topic)
  segs = topic.split("/", -1)
  collected = []
  traverse(@root, segs, 0, topic.start_with?("$"), collected)
  collapse_shared(collected)
end

#remove(filter, session) ⇒ Object

Remove the entry for (session, filter). Returns true if found.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/zzq/routing/topic_trie.rb', line 68

def remove(filter, session)
  real_filter, _group = strip_share(filter)
  segs = real_filter.split("/", -1)
  path = [@root]
  segs.each do |seg|
    node = path.last
    nxt =
      case seg
      when "+" then node.plus_child
      when "#" then node.hash_child
      else node.children[seg]
      end
    return false unless nxt
    path << nxt
  end
  leaf = path.last
  before = leaf.entries.size
  leaf.entries.reject! { |e| e.session.equal?(session) && e.entry[:filter] == filter }
  return false if leaf.entries.size == before
  prune(path, segs)
  true
end

#remove_session(session) ⇒ Object

Drop every entry belonging to session, pruning empty nodes.



93
94
95
# File 'lib/zzq/routing/topic_trie.rb', line 93

def remove_session(session)
  walk_prune(@root, []) { |e| e.session.equal?(session) }
end