Class: KairosMcp::DslAst::DriftDetector

Inherits:
Object
  • Object
show all
Defined in:
lib/kairos_mcp/dsl_ast/drift_detector.rb

Overview

Deterministic content <-> definition drift detection No LLM usage — keyword matching and structural analysis only. NOTE: Drift thresholds and policies may become a SkillSet in Phase 3.

Constant Summary collapse

ASSERTION_KEYWORDS =

Keywords that indicate structural assertions in natural language content

%w[must required always never shall mandatory].freeze

Class Method Summary collapse

Class Method Details

.collect_node_keywords(definition) ⇒ Object

Collect searchable keywords from all definition nodes



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/kairos_mcp/dsl_ast/drift_detector.rb', line 152

def self.collect_node_keywords(definition)
  keywords = []

  definition.nodes.each do |node|
    # Add name parts
    node.name.to_s.split('_').each { |w| keywords << w.downcase }

    # Add option values that are strings
    (node.options || {}).each_value do |v|
      case v
      when String
        v.split(/\s+/).each { |w| keywords << w.downcase if w.length > 3 }
      when Symbol
        keywords << v.to_s.downcase
      end
    end
  end

  keywords.uniq
end

.content_assertions_not_covered(content, definition) ⇒ Object

Find content lines with assertion keywords not covered by any definition node



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/kairos_mcp/dsl_ast/drift_detector.rb', line 125

def self.content_assertions_not_covered(content, definition)
  uncovered = []
  node_keywords = collect_node_keywords(definition)

  content.each_line do |line|
    stripped = line.strip
    next if stripped.empty?
    next if stripped.start_with?('#', '|', '-') # Skip headers, tables, list markers that are structural

    # Check if this line contains an assertion keyword
    line_lower = stripped.downcase
    has_assertion = ASSERTION_KEYWORDS.any? { |kw| line_lower.include?(kw) }
    next unless has_assertion

    # Check if any definition node keyword appears in this line
    covered = node_keywords.any? { |kw| line_lower.include?(kw) }
    unless covered
      # Truncate long lines
      display = stripped.length > 80 ? "#{stripped[0..77]}..." : stripped
      uncovered << display
    end
  end

  uncovered
end

.detect(skill) ⇒ DriftReport

Detect drift between content and definition layers

Parameters:

Returns:



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/kairos_mcp/dsl_ast/drift_detector.rb', line 49

def self.detect(skill)
  items = []
  content = skill.content || ""
  definition = skill.definition
  content_lower = content.downcase

  # No definition => no drift analysis possible
  unless definition && definition.nodes && !definition.nodes.empty?
    return DriftReport.new(
      skill_id: skill.id,
      items: [],
      coverage_ratio: nil,
      timestamp: Time.now.iso8601
    )
  end

  # Check 1: definition nodes reflected in content (definition_orphaned)
  covered_count = 0
  definition.nodes.each do |node|
    if node_reflected_in_content?(node, content_lower)
      covered_count += 1
    else
      items << DriftItem.new(
        direction: :definition_orphaned,
        severity: :warning,
        description: "Definition node '#{node.name}' (#{node.type}) has no corresponding mention in content",
        node_name: node.name
      )
    end
  end

  # Check 2: content assertions not covered by definition (content_uncovered)
  uncovered = content_assertions_not_covered(content, definition)
  uncovered.each do |assertion|
    items << DriftItem.new(
      direction: :content_uncovered,
      severity: :info,
      description: "Content assertion \"#{assertion}\" not covered by any definition node",
      node_name: nil
    )
  end

  # Coverage ratio: proportion of definition nodes reflected in content
  total_nodes = definition.nodes.size
  ratio = total_nodes > 0 ? covered_count.to_f / total_nodes : 1.0

  DriftReport.new(
    skill_id: skill.id,
    items: items,
    coverage_ratio: ratio.round(2),
    timestamp: Time.now.iso8601
  )
end

.node_reflected_in_content?(node, content_lower) ⇒ Boolean

Check if a definition node’s name or key options appear in the content

Returns:

  • (Boolean)


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/kairos_mcp/dsl_ast/drift_detector.rb', line 106

def self.node_reflected_in_content?(node, content_lower)
  # Convert node name from snake_case to words for matching
  name_words = node.name.to_s.split('_')

  # Check if any name word appears in content
  name_match = name_words.any? { |word| content_lower.include?(word.downcase) }
  return true if name_match

  # Also check key option values
  opts = node.options || {}
  opts.each_value do |v|
    next unless v.is_a?(String)
    return true if content_lower.include?(v.downcase)
  end

  false
end