Module: Legion::MCP::Auth
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/mcp/auth.rb
Class Method Summary collapse
- .auth_enabled? ⇒ Boolean
- .authenticate(token) ⇒ Object
- .default_identity ⇒ Object
- .identity_from_claims(claims) ⇒ Object
- .jwt_algorithm ⇒ Object
- .jwt_issuer ⇒ Object
- .jwt_token?(token) ⇒ Boolean
- .jwt_verification_key ⇒ Object
- .require_auth? ⇒ Boolean
- .validate_claims!(claims) ⇒ Object
- .verify_api_key(token) ⇒ Object
- .verify_jwt(token) ⇒ Object
Class Method Details
.auth_enabled? ⇒ Boolean
28 29 30 |
# File 'lib/legion/mcp/auth.rb', line 28 def auth_enabled? Legion::Settings.dig(:mcp, :auth, :enabled) == true end |
.authenticate(token) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/legion/mcp/auth.rb', line 10 def authenticate(token) token_type = if token jwt_token?(token) ? :jwt : :api_key else :none end log.debug("[mcp][auth] action=authenticate token_type=#{token_type}") return { authenticated: false, error: 'missing_token' } unless token result = if jwt_token?(token) verify_jwt(token) else verify_api_key(token) end log.debug("[mcp][auth] action=authenticate.complete authenticated=#{result[:authenticated]}") result end |
.default_identity ⇒ Object
36 37 38 |
# File 'lib/legion/mcp/auth.rb', line 36 def default_identity { user_id: 'anonymous', risk_tier: :low } end |
.identity_from_claims(claims) ⇒ Object
106 107 108 109 110 111 |
# File 'lib/legion/mcp/auth.rb', line 106 def identity_from_claims(claims) identity = { user_id: claims[:sub], risk_tier: claims[:risk_tier]&.to_sym, tenant_id: claims[:tenant_id], worker_id: claims[:worker_id] } log.debug("[mcp][auth] action=identity_from_claims user_id=#{identity[:user_id]} risk_tier=#{identity[:risk_tier]}") identity end |
.jwt_algorithm ⇒ Object
90 91 92 |
# File 'lib/legion/mcp/auth.rb', line 90 def jwt_algorithm Legion::Settings.dig(:mcp, :auth, :jwt_algorithm) || 'HS256' end |
.jwt_issuer ⇒ Object
94 95 96 |
# File 'lib/legion/mcp/auth.rb', line 94 def jwt_issuer Legion::Settings.dig(:mcp, :auth, :jwt_issuer) || 'legion' end |
.jwt_token?(token) ⇒ Boolean
40 41 42 |
# File 'lib/legion/mcp/auth.rb', line 40 def jwt_token?(token) token.count('.') == 2 end |
.jwt_verification_key ⇒ Object
81 82 83 84 85 86 87 88 |
# File 'lib/legion/mcp/auth.rb', line 81 def jwt_verification_key configured = Legion::Settings.dig(:mcp, :auth, :jwt_secret) return configured if configured return Legion::Crypt::ClusterSecret.cs if defined?(Legion::Crypt::ClusterSecret) && Legion::Crypt::ClusterSecret.respond_to?(:cs) nil end |
.require_auth? ⇒ Boolean
32 33 34 |
# File 'lib/legion/mcp/auth.rb', line 32 def require_auth? Legion::Settings.dig(:mcp, :auth, :require_auth) == true end |
.validate_claims!(claims) ⇒ Object
98 99 100 101 102 103 104 |
# File 'lib/legion/mcp/auth.rb', line 98 def validate_claims!(claims) raise 'token missing required sub claim' unless claims[:sub] return unless claims[:exp] raise 'token has expired' if claims[:exp].to_i <= Time.now.to_i end |
.verify_api_key(token) ⇒ Object
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/legion/mcp/auth.rb', line 70 def verify_api_key(token) allowed = Legion::Settings.dig(:mcp, :auth, :allowed_api_keys) || [] matched = allowed.include?(token) log.debug("[mcp][auth] action=verify_api_key allowed_keys=#{allowed.size} matched=#{matched}") if matched { authenticated: true, identity: { user_id: 'api_key', risk_tier: :low } } else { authenticated: false, error: 'invalid_api_key' } end end |
.verify_jwt(token) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/legion/mcp/auth.rb', line 44 def verify_jwt(token) log.debug("[mcp][auth] action=verify_jwt crypt_available=#{defined?(Legion::Crypt::JWT) ? true : false}") return { authenticated: false, error: 'crypt_unavailable' } unless defined?(Legion::Crypt::JWT) verification_key = jwt_verification_key unless verification_key log.warn('No JWT verification key available; rejecting token') return { authenticated: false, error: 'jwt_verification_key_unavailable' } end claims = Legion::Crypt::JWT.verify( token, verification_key: verification_key, algorithm: jwt_algorithm, issuer: jwt_issuer, verify_expiration: true, verify_issuer: true ) { authenticated: true, identity: identity_from_claims(claims) } rescue StandardError => e handle_exception(e, level: :warn, operation: 'legion.mcp.auth.verify_jwt') { authenticated: false, error: e. } end |