Class: KairosMcp::Safety
- Inherits:
-
Object
- Object
- KairosMcp::Safety
- Defined in:
- lib/kairos_mcp/safety.rb
Instance Attribute Summary collapse
-
#current_user ⇒ Object
readonly
.
-
#workspace_root ⇒ Object
readonly
.
Class Method Summary collapse
-
.clear_policies! ⇒ Object
For testing only.
- .policy_for(name) ⇒ Object
-
.register_policy(name, &block) ⇒ Object
Register a named authorization policy for a capability.
-
.registered_policy_names ⇒ Object
Thread-safe list of registered policy names.
- .unregister_policy(name) ⇒ Object
Instance Method Summary collapse
- #can_manage_grants? ⇒ Boolean
- #can_manage_tokens? ⇒ Boolean
-
#can_modify_l0? ⇒ Boolean
Role-based authorization hooks.
- #can_modify_l1? ⇒ Boolean
- #can_modify_l2? ⇒ Boolean
-
#initialize ⇒ Safety
constructor
A new instance of Safety.
- #max_read_bytes ⇒ Object
- #max_search_lines ⇒ Object
- #max_tree_depth ⇒ Object
- #safe_root ⇒ Object
-
#set_user(user_context) ⇒ Object
Set user context from HTTP authentication.
-
#set_workspace(roots = nil) ⇒ Object
Set workspace root from MCP client (roots) or environment.
- #validate_path(path) ⇒ Object
Constructor Details
#initialize ⇒ Safety
Returns a new instance of Safety.
43 44 45 46 47 48 49 50 51 |
# File 'lib/kairos_mcp/safety.rb', line 43 def initialize @config = load_config @default_root = File.(@config['safe_root'] || KairosMcp.data_dir) @workspace_root = nil # Set dynamically via set_workspace @current_user = nil # Set dynamically via set_user (HTTP mode) @allowed_paths = @config['allowed_paths'] || [] @blocklist = @config['blocklist'] || [] @limits = @config['limits'] || {} end |
Instance Attribute Details
#current_user ⇒ Object (readonly)
41 42 43 |
# File 'lib/kairos_mcp/safety.rb', line 41 def current_user @current_user end |
#workspace_root ⇒ Object (readonly)
41 42 43 |
# File 'lib/kairos_mcp/safety.rb', line 41 def workspace_root @workspace_root end |
Class Method Details
.clear_policies! ⇒ Object
For testing only
35 36 37 |
# File 'lib/kairos_mcp/safety.rb', line 35 def self.clear_policies! @policy_mutex.synchronize { @policies = {} } end |
.policy_for(name) ⇒ Object
24 25 26 |
# File 'lib/kairos_mcp/safety.rb', line 24 def self.policy_for(name) @policy_mutex.synchronize { @policies[name.to_sym] } end |
.register_policy(name, &block) ⇒ Object
Register a named authorization policy for a capability. Keys should match capability method names (e.g. :can_modify_l0).
16 17 18 |
# File 'lib/kairos_mcp/safety.rb', line 16 def self.register_policy(name, &block) @policy_mutex.synchronize { @policies[name.to_sym] = block } end |
.registered_policy_names ⇒ Object
Thread-safe list of registered policy names. Used by introspection SkillSet for safety visibility.
30 31 32 |
# File 'lib/kairos_mcp/safety.rb', line 30 def self.registered_policy_names @policy_mutex.synchronize { @policies.keys.map(&:to_s) } end |
.unregister_policy(name) ⇒ Object
20 21 22 |
# File 'lib/kairos_mcp/safety.rb', line 20 def self.unregister_policy(name) @policy_mutex.synchronize { @policies.delete(name.to_sym) } end |
Instance Method Details
#can_manage_grants? ⇒ Boolean
92 93 94 95 96 97 98 |
# File 'lib/kairos_mcp/safety.rb', line 92 def can_manage_grants? return true unless @current_user policy = self.class.policy_for(:can_manage_grants) # Default: deny (unlike can_manage_tokens? which defaults to allow). # Service Grant admin ops should be blocked if the policy SkillSet is not loaded. policy ? policy.call(@current_user) : false end |
#can_manage_tokens? ⇒ Boolean
86 87 88 89 90 |
# File 'lib/kairos_mcp/safety.rb', line 86 def can_manage_tokens? return true unless @current_user policy = self.class.policy_for(:can_manage_tokens) policy ? policy.call(@current_user) : true end |
#can_modify_l0? ⇒ Boolean
Role-based authorization hooks. When no policy is registered (STDIO mode / no Multiuser SkillSet), these return true (permissive fallback). SkillSets register policies via Safety.register_policy to enforce RBAC.
68 69 70 71 72 |
# File 'lib/kairos_mcp/safety.rb', line 68 def can_modify_l0? return true unless @current_user policy = self.class.policy_for(:can_modify_l0) policy ? policy.call(@current_user) : true end |
#can_modify_l1? ⇒ Boolean
74 75 76 77 78 |
# File 'lib/kairos_mcp/safety.rb', line 74 def can_modify_l1? return true unless @current_user policy = self.class.policy_for(:can_modify_l1) policy ? policy.call(@current_user) : true end |
#can_modify_l2? ⇒ Boolean
80 81 82 83 84 |
# File 'lib/kairos_mcp/safety.rb', line 80 def can_modify_l2? return true unless @current_user policy = self.class.policy_for(:can_modify_l2) policy ? policy.call(@current_user) : true end |
#max_read_bytes ⇒ Object
139 140 141 |
# File 'lib/kairos_mcp/safety.rb', line 139 def max_read_bytes @limits['max_read_bytes'] || 100_000 end |
#max_search_lines ⇒ Object
143 144 145 |
# File 'lib/kairos_mcp/safety.rb', line 143 def max_search_lines @limits['max_search_lines'] || 500 end |
#max_tree_depth ⇒ Object
147 148 149 |
# File 'lib/kairos_mcp/safety.rb', line 147 def max_tree_depth @limits['max_tree_depth'] || 5 end |
#safe_root ⇒ Object
119 120 121 |
# File 'lib/kairos_mcp/safety.rb', line 119 def safe_root @workspace_root || @default_root end |
#set_user(user_context) ⇒ Object
Set user context from HTTP authentication
56 57 58 59 60 61 |
# File 'lib/kairos_mcp/safety.rb', line 56 def set_user(user_context) @current_user = user_context if user_context $stderr.puts "[INFO] User context set: #{user_context[:user]} (#{user_context[:role]})" end end |
#set_workspace(roots = nil) ⇒ Object
Set workspace root from MCP client (roots) or environment
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/kairos_mcp/safety.rb', line 101 def set_workspace(roots = nil) if roots && roots.is_a?(Array) && !roots.empty? root = roots.first if root.is_a?(Hash) && root['uri'] uri = root['uri'] @workspace_root = uri.sub(/^file:\/\//, '') elsif root.is_a?(String) @workspace_root = root.sub(/^file:\/\//, '') end end @workspace_root ||= ENV['KAIROS_WORKSPACE'] @workspace_root ||= @default_root $stderr.puts "[INFO] Workspace root set to: #{@workspace_root}" @workspace_root end |
#validate_path(path) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/kairos_mcp/safety.rb', line 123 def validate_path(path) absolute_path = File.(path, safe_root) # 1. Check if path is within safe_root unless inside_safe_root?(absolute_path) raise "Access denied: Path is outside safe root (#{safe_root})" end # 2. Check blocklist if blocked?(absolute_path) raise "Access denied: File matches blocklist pattern" end absolute_path end |