Class: StandardSingpass::Myinfo::Security
- Inherits:
-
Object
- Object
- StandardSingpass::Myinfo::Security
- Extended by:
- T::Sig
- Defined in:
- lib/standard_singpass/myinfo/security.rb
Defined Under Namespace
Classes: DecryptionError, ValidationError
Constant Summary collapse
- JWKS_CACHE_TTL =
T.let(1.hour, ActiveSupport::Duration)
- ALLOWED_ALGORITHMS =
FAPI 2.0 mandates ES256 for all JWS signatures.
T.let(%w[ES256].freeze, T::Array[String])
Class Method Summary collapse
- .build_client_assertion(client_id:, audience:, signing_key:, signing_kid:, code: nil) ⇒ Object
- .build_dpop_proof(http_method:, url:, key_pair:, access_token: nil) ⇒ Object
- .decrypt_jwe(jwe_string, private_keys:) ⇒ Object
- .generate_ephemeral_key_pair ⇒ Object
- .generate_pkce_pair ⇒ Object
- .validate_jws(jws_string, jwks_url:) ⇒ Object
Class Method Details
.build_client_assertion(client_id:, audience:, signing_key:, signing_kid:, code: nil) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/standard_singpass/myinfo/security.rb', line 62 def self.build_client_assertion(client_id:, audience:, signing_key:, signing_kid:, code: nil) key = signing_key.is_a?(OpenSSL::PKey::PKey) ? signing_key : OpenSSL::PKey.read(signing_key) header = { alg: "ES256", kid: signing_kid, typ: "JWT" } claims = { iss: client_id, sub: client_id, aud: audience, iat: Time.current.to_i, exp: (Time.current + 2.minutes).to_i, jti: SecureRandom.uuid } claims[:code] = code if code.present? JWT.encode(claims, key, "ES256", header) end |
.build_dpop_proof(http_method:, url:, key_pair:, access_token: nil) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/standard_singpass/myinfo/security.rb', line 31 def self.build_dpop_proof(http_method:, url:, key_pair:, access_token: nil) jwk = JWT::JWK.new(key_pair) header = { typ: "dpop+jwt", alg: "ES256", jwk: jwk.export(include_private: false) } htu = URI(url).tap { |u| u.query = nil; u.fragment = nil }.to_s claims = { htm: http_method.upcase, htu:, iat: Time.current.to_i, exp: (Time.current + 2.minutes).to_i, jti: SecureRandom.uuid } if access_token ath = Base64.urlsafe_encode64( Digest::SHA256.digest(access_token), padding: false ) claims[:ath] = ath end JWT.encode(claims, key_pair, "ES256", header) end |
.decrypt_jwe(jwe_string, private_keys:) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/standard_singpass/myinfo/security.rb', line 88 def self.decrypt_jwe(jwe_string, private_keys:) header_kid = extract_jwe_kid(jwe_string) raise DecryptionError, "JWE header missing kid field" unless header_kid matching_key = private_keys.find { |k| k[:kid] == header_kid } raise DecryptionError, "No matching decryption key found" unless matching_key alg = extract_jwe_alg(jwe_string) unless EcdhJwe::SUPPORTED_ALGS.include?(alg) raise DecryptionError, "Unsupported JWE alg #{alg.inspect}; FAPI 2.0 requires #{EcdhJwe::SUPPORTED_ALGS.join('/')}" end EcdhJwe.decrypt(jwe_string, private_key: resolve_key(matching_key[:key])) rescue EcdhJwe::DecryptionFailed => e raise DecryptionError, "JWE decryption failed: #{e.}" rescue ArgumentError => e raise DecryptionError, "Malformed JWE: #{e.}" end |
.generate_ephemeral_key_pair ⇒ Object
25 26 27 |
# File 'lib/standard_singpass/myinfo/security.rb', line 25 def self.generate_ephemeral_key_pair OpenSSL::PKey::EC.generate("prime256v1") end |
.generate_pkce_pair ⇒ Object
15 16 17 18 19 20 21 |
# File 'lib/standard_singpass/myinfo/security.rb', line 15 def self.generate_pkce_pair code_verifier = SecureRandom.urlsafe_base64(48) code_challenge = Base64.urlsafe_encode64( Digest::SHA256.digest(code_verifier), padding: false ) { code_verifier:, code_challenge: } end |
.validate_jws(jws_string, jwks_url:) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/standard_singpass/myinfo/security.rb', line 110 def self.validate_jws(jws_string, jwks_url:) jwks_data = fetch_jwks(jwks_url) begin decode_with_jwks(jws_string, jwks_data) rescue JWT::VerificationError # Retry once with a fresh JWKS fetch in case of key rotation jwks_data = fetch_jwks(jwks_url, force_refresh: true) begin decode_with_jwks(jws_string, jwks_data) rescue JWT::DecodeError => e raise ValidationError, "JWS validation failed: #{e.}" end rescue JWT::DecodeError => e raise ValidationError, "JWS validation failed: #{e.}" end end |