Module: Legion::MCP::ToolGovernance
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/mcp/tool_governance.rb
Constant Summary collapse
- RISK_TIER_ORDER =
{ low: 0, medium: 1, high: 2, critical: 3 }.freeze
- DEFAULT_TOOL_TIERS =
{ 'legion.list_workers' => :low, 'legion.show_worker' => :low, 'legion.list_tasks' => :low, 'legion.get_task' => :low, 'legion.get_status' => :low, 'legion.get_config' => :low, 'legion.describe_runner' => :low, 'legion.list_extensions' => :low, 'legion.run_task' => :medium, 'legion.create_schedule' => :medium, 'legion.worker_lifecycle' => :high, 'legion.enable_extension' => :high, 'legion.disable_extension' => :high, 'legion.delete_task' => :high, 'legion.rbac_assignments' => :high, 'legion.rbac_grants' => :high }.freeze
Class Method Summary collapse
- .audit_enabled? ⇒ Boolean
- .audit_invocation(tool_name:, identity:, params:, result:) ⇒ Object
- .custom_tiers ⇒ Object
-
.definition_tier(tool) ⇒ Object
Returns the mcp_tier declared on the tool class via the definition DSL, or nil if absent.
- .emit_governance_filter(identity, before_count, after_count, blocked_count) ⇒ Object
- .filter_by_risk_tier(tools, risk_tier) ⇒ Object
- .filter_by_role(tools, role) ⇒ Object
- .filter_tools(tools, identity) ⇒ Object
- .governance_enabled? ⇒ Boolean
- .role_allowlist(role) ⇒ Object
- .tool_name(tool) ⇒ Object
Class Method Details
.audit_enabled? ⇒ Boolean
126 127 128 |
# File 'lib/legion/mcp/tool_governance.rb', line 126 def audit_enabled? Legion::Settings.dig(:mcp, :governance, :audit_invocations) != false end |
.audit_invocation(tool_name:, identity:, params:, result:) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/legion/mcp/tool_governance.rb', line 89 def audit_invocation(tool_name:, identity:, params:, result:) return unless audit_enabled? && defined?(Legion::Audit) log.debug("[mcp][governance] action=audit_invocation tool_name=#{tool_name} " \ "user=#{identity&.dig(:user_id)}") Legion::Audit.record( event_type: 'mcp_tool_invocation', principal_id: identity&.dig(:worker_id) || identity&.dig(:user_id) || 'unknown', action: "mcp.#{tool_name}", resource: 'mcp_tool', detail: { param_keys: params&.keys, success: !result&.dig(:error) } ) end |
.custom_tiers ⇒ Object
130 131 132 |
# File 'lib/legion/mcp/tool_governance.rb', line 130 def custom_tiers Legion::Settings.dig(:mcp, :governance, :tool_risk_tiers) || {} end |
.definition_tier(tool) ⇒ Object
Returns the mcp_tier declared on the tool class via the definition DSL, or nil if absent. Tool classes built by FunctionDiscovery expose mcp_tier as a singleton method.
146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/legion/mcp/tool_governance.rb', line 146 def definition_tier(tool) return nil unless tool.respond_to?(:mcp_tier) tier = tool.mcp_tier return nil if tier.nil? normalized = tier.to_s.downcase.to_sym return nil unless RISK_TIER_ORDER.key?(normalized) normalized end |
.emit_governance_filter(identity, before_count, after_count, blocked_count) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/legion/mcp/tool_governance.rb', line 103 def emit_governance_filter(identity, before_count, after_count, blocked_count) return unless defined?(Legion::MCP::Audit) Legion::MCP::Audit.emit_governance( event: :tools_filtered, identity: identity, before_count: before_count, after_count: after_count, blocked_count: blocked_count, reason: 'risk_tier_or_role_filter', conversation_id: Thread.current[:legion_mcp_conversation_id], request_id: Thread.current[:legion_mcp_request_id], trace_id: Thread.current[:legion_mcp_trace_id], timestamp: Time.now.utc.iso8601 ) rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'mcp.governance.emit_audit') end |
.filter_by_risk_tier(tools, risk_tier) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/legion/mcp/tool_governance.rb', line 45 def filter_by_risk_tier(tools, risk_tier) tier_value = RISK_TIER_ORDER[risk_tier || :low] || 0 log.debug("[mcp][governance] action=filter_by_risk_tier risk_tier=#{risk_tier || :low} " \ "tier_value=#{tier_value} tools_in=#{tools.size}") # DEFAULT_TOOL_TIERS is the fallback; custom_tiers (from Settings) override it; # definition-level mcp_tier on the tool class takes highest precedence. # Tools without any tier metadata default to :medium so they are not # exposed to low-tier identities (safe-by-default). fallback_tiers = DEFAULT_TOOL_TIERS.merge(custom_tiers) result = tools.select do |tool| tool_tier = definition_tier(tool) || fallback_tiers[tool_name(tool)] || :medium (RISK_TIER_ORDER[tool_tier] || 0) <= tier_value end log.debug("[mcp][governance] action=filter_by_risk_tier.complete tools_out=#{result.size}") result end |
.filter_by_role(tools, role) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/legion/mcp/tool_governance.rb', line 63 def filter_by_role(tools, role) return tools unless role allowed = role_allowlist(role) log.debug("[mcp][governance] action=filter_by_role role=#{role} " \ "allowlist_size=#{allowed.size} wildcard=#{allowed.include?('*')}") return tools if allowed.include?('*') result = tools.select do |tool| name = tool_name(tool).to_s allowed.any? { |pattern| File.fnmatch?(pattern, name) } end log.debug("[mcp][governance] action=filter_by_role.complete before=#{tools.size} after=#{result.size}") result end |
.filter_tools(tools, identity) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/legion/mcp/tool_governance.rb', line 31 def filter_tools(tools, identity) before = tools.size tools = filter_by_risk_tier(tools, identity&.dig(:risk_tier)) if governance_enabled? tools = filter_by_role(tools, identity[:role]) if identity.is_a?(Hash) && identity[:role] blocked_count = before - tools.size emit_governance_filter(identity, before, tools.size, blocked_count) if blocked_count.positive? log.debug("[mcp][governance] action=filter_tools before=#{before} after=#{tools.size} " \ "governance_enabled=#{governance_enabled?} " \ "risk_tier=#{identity&.dig(:risk_tier)} role=#{identity[:role] if identity.is_a?(Hash)}") tools end |
.governance_enabled? ⇒ Boolean
122 123 124 |
# File 'lib/legion/mcp/tool_governance.rb', line 122 def governance_enabled? Legion::Settings.dig(:mcp, :governance, :enabled) == true end |
.role_allowlist(role) ⇒ Object
79 80 81 82 83 84 85 86 87 |
# File 'lib/legion/mcp/tool_governance.rb', line 79 def role_allowlist(role) roles = Legion::Settings.dig(:mcp, :roles) return ['*'] unless roles.is_a?(Hash) role_config = roles[role.to_sym] || roles[role.to_s] return ['*'] unless role_config.is_a?(Hash) Array(role_config[:tools] || role_config['tools']) end |
.tool_name(tool) ⇒ Object
134 135 136 137 138 139 140 141 142 |
# File 'lib/legion/mcp/tool_governance.rb', line 134 def tool_name(tool) if tool.respond_to?(:tool_name) tool.tool_name elsif tool.respond_to?(:name) tool.name else tool.to_s end end |