Module: Legion::MCP::ToolGovernance
- Defined in:
- lib/legion/mcp/tool_governance.rb
Constant Summary collapse
- RISK_TIER_ORDER =
{ low: 0, medium: 1, high: 2, critical: 3 }.freeze
- DEFAULT_TOOL_TIERS =
{ 'legion.list_workers' => :low, 'legion.show_worker' => :low, 'legion.list_tasks' => :low, 'legion.get_task' => :low, 'legion.get_status' => :low, 'legion.get_config' => :low, 'legion.describe_runner' => :low, 'legion.list_extensions' => :low, 'legion.run_task' => :medium, 'legion.create_schedule' => :medium, 'legion.worker_lifecycle' => :high, 'legion.enable_extension' => :high, 'legion.disable_extension' => :high, 'legion.delete_task' => :high, 'legion.rbac_assignments' => :high, 'legion.rbac_grants' => :high }.freeze
Class Method Summary collapse
- .audit_enabled? ⇒ Boolean
- .audit_invocation(tool_name:, identity:, params:, result:) ⇒ Object
- .custom_tiers ⇒ Object
-
.definition_tier(tool) ⇒ Object
Returns the mcp_tier declared on the tool class via the definition DSL, or nil if absent.
- .filter_tools(tools, identity) ⇒ Object
- .governance_enabled? ⇒ Boolean
- .tool_name(tool) ⇒ Object
Class Method Details
.audit_enabled? ⇒ Boolean
60 61 62 |
# File 'lib/legion/mcp/tool_governance.rb', line 60 def audit_enabled? Legion::Settings.dig(:mcp, :governance, :audit_invocations) != false end |
.audit_invocation(tool_name:, identity:, params:, result:) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/legion/mcp/tool_governance.rb', line 44 def audit_invocation(tool_name:, identity:, params:, result:) return unless audit_enabled? && defined?(Legion::Audit) Legion::Audit.record( event_type: 'mcp_tool_invocation', principal_id: identity&.dig(:worker_id) || identity&.dig(:user_id) || 'unknown', action: "mcp.#{tool_name}", resource: 'mcp_tool', detail: { param_keys: params&.keys, success: !result&.dig(:error) } ) end |
.custom_tiers ⇒ Object
64 65 66 |
# File 'lib/legion/mcp/tool_governance.rb', line 64 def custom_tiers Legion::Settings.dig(:mcp, :governance, :tool_risk_tiers) || {} end |
.definition_tier(tool) ⇒ Object
Returns the mcp_tier declared on the tool class via the definition DSL, or nil if absent. Tool classes built by FunctionDiscovery expose mcp_tier as a singleton method.
80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/legion/mcp/tool_governance.rb', line 80 def definition_tier(tool) return nil unless tool.respond_to?(:mcp_tier) tier = tool.mcp_tier return nil if tier.nil? normalized = tier.to_s.downcase.to_sym return nil unless RISK_TIER_ORDER.key?(normalized) normalized end |
.filter_tools(tools, identity) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/legion/mcp/tool_governance.rb', line 29 def filter_tools(tools, identity) return tools unless governance_enabled? risk_tier = identity&.dig(:risk_tier) || :low tier_value = RISK_TIER_ORDER[risk_tier] || 0 # DEFAULT_TOOL_TIERS is the fallback; custom_tiers (from Settings) override it; # definition-level mcp_tier on the tool class takes highest precedence. fallback_tiers = DEFAULT_TOOL_TIERS.merge(custom_tiers) tools.select do |tool| tool_tier = definition_tier(tool) || fallback_tiers[tool_name(tool)] || :low (RISK_TIER_ORDER[tool_tier] || 0) <= tier_value end end |
.governance_enabled? ⇒ Boolean
56 57 58 |
# File 'lib/legion/mcp/tool_governance.rb', line 56 def governance_enabled? Legion::Settings.dig(:mcp, :governance, :enabled) == true end |
.tool_name(tool) ⇒ Object
68 69 70 71 72 73 74 75 76 |
# File 'lib/legion/mcp/tool_governance.rb', line 68 def tool_name(tool) if tool.respond_to?(:tool_name) tool.tool_name elsif tool.respond_to?(:name) tool.name else tool.to_s end end |