Module: Legion::MCP::Auth

Extended by:
Logging::Helper
Defined in:
lib/legion/mcp/auth.rb

Class Method Summary collapse

Class Method Details

.auth_enabled?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/legion/mcp/auth.rb', line 28

def auth_enabled?
  Legion::Settings.dig(:mcp, :auth, :enabled) == true
end

.authenticate(token) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/legion/mcp/auth.rb', line 10

def authenticate(token)
  token_type = if token
                 jwt_token?(token) ? :jwt : :api_key
               else
                 :none
               end
  log.debug("[mcp][auth] action=authenticate token_type=#{token_type}")
  return { authenticated: false, error: 'missing_token' } unless token

  result = if jwt_token?(token)
             verify_jwt(token)
           else
             verify_api_key(token)
           end
  log.debug("[mcp][auth] action=authenticate.complete authenticated=#{result[:authenticated]}")
  result
end

.default_identityObject



36
37
38
# File 'lib/legion/mcp/auth.rb', line 36

def default_identity
  { user_id: 'anonymous', risk_tier: :low }
end

.identity_from_claims(claims) ⇒ Object



106
107
108
109
110
111
# File 'lib/legion/mcp/auth.rb', line 106

def identity_from_claims(claims)
  identity = { user_id: claims[:sub], risk_tier: claims[:risk_tier]&.to_sym,
               tenant_id: claims[:tenant_id], worker_id: claims[:worker_id] }
  log.debug("[mcp][auth] action=identity_from_claims user_id=#{identity[:user_id]} risk_tier=#{identity[:risk_tier]}")
  identity
end

.jwt_algorithmObject



90
91
92
# File 'lib/legion/mcp/auth.rb', line 90

def jwt_algorithm
  Legion::Settings.dig(:mcp, :auth, :jwt_algorithm) || 'HS256'
end

.jwt_issuerObject



94
95
96
# File 'lib/legion/mcp/auth.rb', line 94

def jwt_issuer
  Legion::Settings.dig(:mcp, :auth, :jwt_issuer) || 'legion'
end

.jwt_token?(token) ⇒ Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/legion/mcp/auth.rb', line 40

def jwt_token?(token)
  token.count('.') == 2
end

.jwt_verification_keyObject



81
82
83
84
85
86
87
88
# File 'lib/legion/mcp/auth.rb', line 81

def jwt_verification_key
  configured = Legion::Settings.dig(:mcp, :auth, :jwt_secret)
  return configured if configured

  return Legion::Crypt::ClusterSecret.cs if defined?(Legion::Crypt::ClusterSecret) && Legion::Crypt::ClusterSecret.respond_to?(:cs)

  nil
end

.require_auth?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/legion/mcp/auth.rb', line 32

def require_auth?
  Legion::Settings.dig(:mcp, :auth, :require_auth) == true
end

.validate_claims!(claims) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/legion/mcp/auth.rb', line 98

def validate_claims!(claims)
  raise 'token missing required sub claim' unless claims[:sub]

  return unless claims[:exp]

  raise 'token has expired' if claims[:exp].to_i <= Time.now.to_i
end

.verify_api_key(token) ⇒ Object



70
71
72
73
74
75
76
77
78
79
# File 'lib/legion/mcp/auth.rb', line 70

def verify_api_key(token)
  allowed = Legion::Settings.dig(:mcp, :auth, :allowed_api_keys) || []
  matched = allowed.include?(token)
  log.debug("[mcp][auth] action=verify_api_key allowed_keys=#{allowed.size} matched=#{matched}")
  if matched
    { authenticated: true, identity: { user_id: 'api_key', risk_tier: :low } }
  else
    { authenticated: false, error: 'invalid_api_key' }
  end
end

.verify_jwt(token) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/legion/mcp/auth.rb', line 44

def verify_jwt(token)
  log.debug("[mcp][auth] action=verify_jwt crypt_available=#{defined?(Legion::Crypt::JWT) ? true : false}")
  return { authenticated: false, error: 'crypt_unavailable' } unless defined?(Legion::Crypt::JWT)

  verification_key = jwt_verification_key

  unless verification_key
    log.warn('No JWT verification key available; rejecting token')
    return { authenticated: false, error: 'jwt_verification_key_unavailable' }
  end

  claims = Legion::Crypt::JWT.verify(
    token,
    verification_key:  verification_key,
    algorithm:         jwt_algorithm,
    issuer:            jwt_issuer,
    verify_expiration: true,
    verify_issuer:     true
  )

  { authenticated: true, identity: identity_from_claims(claims) }
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'legion.mcp.auth.verify_jwt')
  { authenticated: false, error: e.message }
end