Class: KairosMcp::Safety

Inherits:
Object
  • Object
show all
Defined in:
lib/kairos_mcp/safety.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSafety

Returns a new instance of Safety.



43
44
45
46
47
48
49
50
51
# File 'lib/kairos_mcp/safety.rb', line 43

def initialize
  @config = load_config
  @default_root = File.expand_path(@config['safe_root'] || KairosMcp.data_dir)
  @workspace_root = nil  # Set dynamically via set_workspace
  @current_user = nil    # Set dynamically via set_user (HTTP mode)
  @allowed_paths = @config['allowed_paths'] || []
  @blocklist = @config['blocklist'] || []
  @limits = @config['limits'] || {}
end

Instance Attribute Details

#current_userObject (readonly)



41
42
43
# File 'lib/kairos_mcp/safety.rb', line 41

def current_user
  @current_user
end

#workspace_rootObject (readonly)



41
42
43
# File 'lib/kairos_mcp/safety.rb', line 41

def workspace_root
  @workspace_root
end

Class Method Details

.clear_policies!Object

For testing only



35
36
37
# File 'lib/kairos_mcp/safety.rb', line 35

def self.clear_policies!
  @policy_mutex.synchronize { @policies = {} }
end

.policy_for(name) ⇒ Object



24
25
26
# File 'lib/kairos_mcp/safety.rb', line 24

def self.policy_for(name)
  @policy_mutex.synchronize { @policies[name.to_sym] }
end

.register_policy(name, &block) ⇒ Object

Register a named authorization policy for a capability. Keys should match capability method names (e.g. :can_modify_l0).



16
17
18
# File 'lib/kairos_mcp/safety.rb', line 16

def self.register_policy(name, &block)
  @policy_mutex.synchronize { @policies[name.to_sym] = block }
end

.registered_policy_namesObject

Thread-safe list of registered policy names. Used by introspection SkillSet for safety visibility.



30
31
32
# File 'lib/kairos_mcp/safety.rb', line 30

def self.registered_policy_names
  @policy_mutex.synchronize { @policies.keys.map(&:to_s) }
end

.unregister_policy(name) ⇒ Object



20
21
22
# File 'lib/kairos_mcp/safety.rb', line 20

def self.unregister_policy(name)
  @policy_mutex.synchronize { @policies.delete(name.to_sym) }
end

Instance Method Details

#can_manage_grants?Boolean

Returns:

  • (Boolean)


92
93
94
95
96
97
98
# File 'lib/kairos_mcp/safety.rb', line 92

def can_manage_grants?
  return true unless @current_user
  policy = self.class.policy_for(:can_manage_grants)
  # Default: deny (unlike can_manage_tokens? which defaults to allow).
  # Service Grant admin ops should be blocked if the policy SkillSet is not loaded.
  policy ? policy.call(@current_user) : false
end

#can_manage_tokens?Boolean

Returns:

  • (Boolean)


86
87
88
89
90
# File 'lib/kairos_mcp/safety.rb', line 86

def can_manage_tokens?
  return true unless @current_user
  policy = self.class.policy_for(:can_manage_tokens)
  policy ? policy.call(@current_user) : true
end

#can_modify_l0?Boolean

Role-based authorization hooks. When no policy is registered (STDIO mode / no Multiuser SkillSet), these return true (permissive fallback). SkillSets register policies via Safety.register_policy to enforce RBAC.

Returns:

  • (Boolean)


68
69
70
71
72
# File 'lib/kairos_mcp/safety.rb', line 68

def can_modify_l0?
  return true unless @current_user
  policy = self.class.policy_for(:can_modify_l0)
  policy ? policy.call(@current_user) : true
end

#can_modify_l1?Boolean

Returns:

  • (Boolean)


74
75
76
77
78
# File 'lib/kairos_mcp/safety.rb', line 74

def can_modify_l1?
  return true unless @current_user
  policy = self.class.policy_for(:can_modify_l1)
  policy ? policy.call(@current_user) : true
end

#can_modify_l2?Boolean

Returns:

  • (Boolean)


80
81
82
83
84
# File 'lib/kairos_mcp/safety.rb', line 80

def can_modify_l2?
  return true unless @current_user
  policy = self.class.policy_for(:can_modify_l2)
  policy ? policy.call(@current_user) : true
end

#max_read_bytesObject



139
140
141
# File 'lib/kairos_mcp/safety.rb', line 139

def max_read_bytes
  @limits['max_read_bytes'] || 100_000
end

#max_search_linesObject



143
144
145
# File 'lib/kairos_mcp/safety.rb', line 143

def max_search_lines
  @limits['max_search_lines'] || 500
end

#max_tree_depthObject



147
148
149
# File 'lib/kairos_mcp/safety.rb', line 147

def max_tree_depth
  @limits['max_tree_depth'] || 5
end

#safe_rootObject



119
120
121
# File 'lib/kairos_mcp/safety.rb', line 119

def safe_root
  @workspace_root || @default_root
end

#set_user(user_context) ⇒ Object

Set user context from HTTP authentication

Parameters:

  • user_context (Hash, nil)

    { user: “name”, role: “owner”|“member”|“guest”, … }



56
57
58
59
60
61
# File 'lib/kairos_mcp/safety.rb', line 56

def set_user(user_context)
  @current_user = user_context
  if user_context
    $stderr.puts "[INFO] User context set: #{user_context[:user]} (#{user_context[:role]})"
  end
end

#set_workspace(roots = nil) ⇒ Object

Set workspace root from MCP client (roots) or environment



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/kairos_mcp/safety.rb', line 101

def set_workspace(roots = nil)
  if roots && roots.is_a?(Array) && !roots.empty?
    root = roots.first
    if root.is_a?(Hash) && root['uri']
      uri = root['uri']
      @workspace_root = uri.sub(/^file:\/\//, '')
    elsif root.is_a?(String)
      @workspace_root = root.sub(/^file:\/\//, '')
    end
  end

  @workspace_root ||= ENV['KAIROS_WORKSPACE']
  @workspace_root ||= @default_root

  $stderr.puts "[INFO] Workspace root set to: #{@workspace_root}"
  @workspace_root
end

#validate_path(path) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/kairos_mcp/safety.rb', line 123

def validate_path(path)
  absolute_path = File.expand_path(path, safe_root)
  
  # 1. Check if path is within safe_root
  unless inside_safe_root?(absolute_path)
    raise "Access denied: Path is outside safe root (#{safe_root})"
  end

  # 2. Check blocklist
  if blocked?(absolute_path)
    raise "Access denied: File matches blocklist pattern"
  end

  absolute_path
end