Module: Legion::Crypt::Spiffe::IdentityHelpers
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/crypt/spiffe/identity_helpers.rb
Overview
Helpers for signing, verifying, and inspecting SPIFFE SVIDs.
These methods work directly with X509Svid and JwtSvid structs returned by WorkloadApiClient. No external gem is required —all operations use the Ruby stdlib OpenSSL bindings.
Constant Summary
Constants included from Logging::Helper
Instance Method Summary collapse
-
#extract_spiffe_id_from_cert(cert_pem) ⇒ SpiffeId?
Extract the SPIFFE ID embedded in an X.509 certificate’s SAN URI extension.
-
#sign_with_svid(data, svid:) ⇒ String
Sign arbitrary data with the private key from an X.509 SVID.
-
#svid_identity(svid) ⇒ Hash
Return a hash of identity information extracted from an SVID.
-
#trusted_cert?(cert_pem, svid:) ⇒ Boolean
Validate that a certificate chain is trusted by the bundle embedded in the given SVID.
-
#verify_svid_signature(data, signature_b64:, svid:) ⇒ Boolean
Verify a Base64-encoded signature produced by sign_with_svid.
Methods included from Logging::Helper
Instance Method Details
#extract_spiffe_id_from_cert(cert_pem) ⇒ SpiffeId?
Extract the SPIFFE ID embedded in an X.509 certificate’s SAN URI extension. Returns a SpiffeId struct or nil if none is found.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 67 def extract_spiffe_id_from_cert(cert_pem) cert = OpenSSL::X509::Certificate.new(cert_pem) san = cert.extensions.find { |e| e.oid == 'subjectAltName' } return nil unless san san.value.split(',').each do |entry| entry = entry.strip next unless entry.start_with?('URI:spiffe://') uri = entry.sub('URI:', '') return Legion::Crypt::Spiffe.parse_id(uri) rescue InvalidSpiffeIdError => e handle_exception(e, level: :debug, operation: 'crypt.spiffe.extract_spiffe_id_from_cert', san_entry: entry) next end nil rescue OpenSSL::X509::CertificateError => e handle_exception(e, level: :debug, operation: 'crypt.spiffe.extract_spiffe_id_from_cert') nil end |
#sign_with_svid(data, svid:) ⇒ String
Sign arbitrary data with the private key from an X.509 SVID. Returns the signature as a Base64-encoded string.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 24 def sign_with_svid(data, svid:) raise SvidError, 'Cannot sign: SVID is nil' if svid.nil? raise SvidError, "Cannot sign: SVID '#{svid.spiffe_id}' has expired" if svid.expired? raise SvidError, 'Cannot sign: SVID private key is missing' if svid.key_pem.nil? key = OpenSSL::PKey.read(svid.key_pem) digest = OpenSSL::Digest.new('SHA256') signature = key.sign(digest, data.b) log.debug("SPIFFE signed payload with SVID id=#{svid.spiffe_id}") Base64.strict_encode64(signature) rescue StandardError => e handle_exception(e, level: :error, operation: 'crypt.spiffe.sign_with_svid', spiffe_id: svid&.spiffe_id&.to_s) raise end |
#svid_identity(svid) ⇒ Hash
Return a hash of identity information extracted from an SVID.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 114 def svid_identity(svid) return {} if svid.nil? base = { spiffe_id: svid.spiffe_id.to_s, trust_domain: svid.spiffe_id.trust_domain, workload_path: svid.spiffe_id.path, expiry: svid.expiry, expired: svid.expired? } case svid when X509Svid base.merge(type: :x509, ttl_seconds: svid.ttl.to_i) when JwtSvid base.merge(type: :jwt, audience: svid.audience) else base end end |
#trusted_cert?(cert_pem, svid:) ⇒ Boolean
Validate that a certificate chain is trusted by the bundle embedded in the given SVID. Returns true if the leaf cert chains up to the bundle CA, false otherwise.
96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 96 def trusted_cert?(cert_pem, svid:) raise SvidError, 'Cannot check trust: SVID is nil' if svid.nil? return false if svid.bundle_pem.nil? store = OpenSSL::X509::Store.new store.add_cert(OpenSSL::X509::Certificate.new(svid.bundle_pem)) leaf = OpenSSL::X509::Certificate.new(cert_pem) store.verify(leaf) rescue OpenSSL::X509::CertificateError, OpenSSL::X509::StoreError => e handle_exception(e, level: :warn, operation: 'crypt.spiffe.trusted_cert?', spiffe_id: svid&.spiffe_id&.to_s) false end |
#verify_svid_signature(data, signature_b64:, svid:) ⇒ Boolean
Verify a Base64-encoded signature produced by sign_with_svid.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 45 def verify_svid_signature(data, signature_b64:, svid:) raise SvidError, 'Cannot verify: SVID is nil' if svid.nil? raise SvidError, "Cannot verify: SVID '#{svid.spiffe_id}' has expired" if svid.expired? raise SvidError, 'Cannot verify: SVID certificate is missing' if svid.cert_pem.nil? cert = OpenSSL::X509::Certificate.new(svid.cert_pem) digest = OpenSSL::Digest.new('SHA256') signature = Base64.strict_decode64(signature_b64) result = cert.public_key.verify(digest, signature, data.b) log.debug("SPIFFE signature verification completed id=#{svid.spiffe_id} valid=#{result}") result rescue OpenSSL::PKey::PKeyError, OpenSSL::X509::CertificateError, ArgumentError => e handle_exception(e, level: :warn, operation: 'crypt.spiffe.verify_svid_signature', spiffe_id: svid&.spiffe_id&.to_s) log.warn("[SPIFFE] SVID signature verification error: #{e.}") false end |