Module: Legion::MCP::ToolGovernance

Extended by:
Logging::Helper
Defined in:
lib/legion/mcp/tool_governance.rb

Constant Summary collapse

RISK_TIER_ORDER =
{ low: 0, medium: 1, high: 2, critical: 3 }.freeze
DEFAULT_TOOL_TIERS =
{
  'legion.list_workers'      => :low,
  'legion.show_worker'       => :low,
  'legion.list_tasks'        => :low,
  'legion.get_task'          => :low,
  'legion.get_status'        => :low,
  'legion.get_config'        => :low,
  'legion.describe_runner'   => :low,
  'legion.list_extensions'   => :low,
  'legion.run_task'          => :medium,
  'legion.create_schedule'   => :medium,
  'legion.worker_lifecycle'  => :high,
  'legion.enable_extension'  => :high,
  'legion.disable_extension' => :high,
  'legion.delete_task'       => :high,
  'legion.rbac_assignments'  => :high,
  'legion.rbac_grants'       => :high
}.freeze

Class Method Summary collapse

Class Method Details

.audit_enabled?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/legion/mcp/tool_governance.rb', line 126

def audit_enabled?
  Legion::Settings.dig(:mcp, :governance, :audit_invocations) != false
end

.audit_invocation(tool_name:, identity:, params:, result:) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/legion/mcp/tool_governance.rb', line 89

def audit_invocation(tool_name:, identity:, params:, result:)
  return unless audit_enabled? && defined?(Legion::Audit)

  log.debug("[mcp][governance] action=audit_invocation tool_name=#{tool_name} " \
            "user=#{identity&.dig(:user_id)}")
  Legion::Audit.record(
    event_type:   'mcp_tool_invocation',
    principal_id: identity&.dig(:worker_id) || identity&.dig(:user_id) || 'unknown',
    action:       "mcp.#{tool_name}",
    resource:     'mcp_tool',
    detail:       { param_keys: params&.keys, success: !result&.dig(:error) }
  )
end

.custom_tiersObject



130
131
132
# File 'lib/legion/mcp/tool_governance.rb', line 130

def custom_tiers
  Legion::Settings.dig(:mcp, :governance, :tool_risk_tiers) || {}
end

.definition_tier(tool) ⇒ Object

Returns the mcp_tier declared on the tool class via the definition DSL, or nil if absent. Tool classes built by FunctionDiscovery expose mcp_tier as a singleton method.



146
147
148
149
150
151
152
153
154
155
156
# File 'lib/legion/mcp/tool_governance.rb', line 146

def definition_tier(tool)
  return nil unless tool.respond_to?(:mcp_tier)

  tier = tool.mcp_tier
  return nil if tier.nil?

  normalized = tier.to_s.downcase.to_sym
  return nil unless RISK_TIER_ORDER.key?(normalized)

  normalized
end

.emit_governance_filter(identity, before_count, after_count, blocked_count) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/legion/mcp/tool_governance.rb', line 103

def emit_governance_filter(identity, before_count, after_count, blocked_count)
  return unless defined?(Legion::MCP::Audit)

  Legion::MCP::Audit.emit_governance(
    event:           :tools_filtered,
    identity:        identity,
    before_count:    before_count,
    after_count:     after_count,
    blocked_count:   blocked_count,
    reason:          'risk_tier_or_role_filter',
    conversation_id: Thread.current[:legion_mcp_conversation_id],
    request_id:      Thread.current[:legion_mcp_request_id],
    trace_id:        Thread.current[:legion_mcp_trace_id],
    timestamp:       Time.now.utc.iso8601
  )
rescue StandardError => e
  handle_exception(e, level: :warn, handled: true, operation: 'mcp.governance.emit_audit')
end

.filter_by_risk_tier(tools, risk_tier) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/legion/mcp/tool_governance.rb', line 45

def filter_by_risk_tier(tools, risk_tier)
  tier_value = RISK_TIER_ORDER[risk_tier || :low] || 0
  log.debug("[mcp][governance] action=filter_by_risk_tier risk_tier=#{risk_tier || :low} " \
            "tier_value=#{tier_value} tools_in=#{tools.size}")

  # DEFAULT_TOOL_TIERS is the fallback; custom_tiers (from Settings) override it;
  # definition-level mcp_tier on the tool class takes highest precedence.
  # Tools without any tier metadata default to :medium so they are not
  # exposed to low-tier identities (safe-by-default).
  fallback_tiers = DEFAULT_TOOL_TIERS.merge(custom_tiers)
  result = tools.select do |tool|
    tool_tier = definition_tier(tool) || fallback_tiers[tool_name(tool)] || :medium
    (RISK_TIER_ORDER[tool_tier] || 0) <= tier_value
  end
  log.debug("[mcp][governance] action=filter_by_risk_tier.complete tools_out=#{result.size}")
  result
end

.filter_by_role(tools, role) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/legion/mcp/tool_governance.rb', line 63

def filter_by_role(tools, role)
  return tools unless role

  allowed = role_allowlist(role)
  log.debug("[mcp][governance] action=filter_by_role role=#{role} " \
            "allowlist_size=#{allowed.size} wildcard=#{allowed.include?('*')}")
  return tools if allowed.include?('*')

  result = tools.select do |tool|
    name = tool_name(tool).to_s
    allowed.any? { |pattern| File.fnmatch?(pattern, name) }
  end
  log.debug("[mcp][governance] action=filter_by_role.complete before=#{tools.size} after=#{result.size}")
  result
end

.filter_tools(tools, identity) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/legion/mcp/tool_governance.rb', line 31

def filter_tools(tools, identity)
  before = tools.size
  tools = filter_by_risk_tier(tools, identity&.dig(:risk_tier)) if governance_enabled?
  tools = filter_by_role(tools, identity[:role]) if identity.is_a?(Hash) && identity[:role]

  blocked_count = before - tools.size
  emit_governance_filter(identity, before, tools.size, blocked_count) if blocked_count.positive?

  log.debug("[mcp][governance] action=filter_tools before=#{before} after=#{tools.size} " \
            "governance_enabled=#{governance_enabled?} " \
            "risk_tier=#{identity&.dig(:risk_tier)} role=#{identity[:role] if identity.is_a?(Hash)}")
  tools
end

.governance_enabled?Boolean

Returns:

  • (Boolean)


122
123
124
# File 'lib/legion/mcp/tool_governance.rb', line 122

def governance_enabled?
  Legion::Settings.dig(:mcp, :governance, :enabled) == true
end

.role_allowlist(role) ⇒ Object



79
80
81
82
83
84
85
86
87
# File 'lib/legion/mcp/tool_governance.rb', line 79

def role_allowlist(role)
  roles = Legion::Settings.dig(:mcp, :roles)
  return ['*'] unless roles.is_a?(Hash)

  role_config = roles[role.to_sym] || roles[role.to_s]
  return ['*'] unless role_config.is_a?(Hash)

  Array(role_config[:tools] || role_config['tools'])
end

.tool_name(tool) ⇒ Object



134
135
136
137
138
139
140
141
142
# File 'lib/legion/mcp/tool_governance.rb', line 134

def tool_name(tool)
  if tool.respond_to?(:tool_name)
    tool.tool_name
  elsif tool.respond_to?(:name)
    tool.name
  else
    tool.to_s
  end
end