Module: Legion::Crypt::Spiffe::IdentityHelpers

Includes:
Logging::Helper
Defined in:
lib/legion/crypt/spiffe/identity_helpers.rb

Overview

Helpers for signing, verifying, and inspecting SPIFFE SVIDs.

These methods work directly with X509Svid and JwtSvid structs returned by WorkloadApiClient. No external gem is required —all operations use the Ruby stdlib OpenSSL bindings.

Constant Summary

Constants included from Logging::Helper

Logging::Helper::CompatLogger

Instance Method Summary collapse

Methods included from Logging::Helper

#handle_exception, #log

Instance Method Details

#extract_spiffe_id_from_cert(cert_pem) ⇒ SpiffeId?

Extract the SPIFFE ID embedded in an X.509 certificate’s SAN URI extension. Returns a SpiffeId struct or nil if none is found.

Parameters:

  • cert_pem (String)

    PEM-encoded X.509 certificate.

Returns:



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 67

def extract_spiffe_id_from_cert(cert_pem)
  cert = OpenSSL::X509::Certificate.new(cert_pem)
  san  = cert.extensions.find { |e| e.oid == 'subjectAltName' }
  return nil unless san

  san.value.split(',').each do |entry|
    entry = entry.strip
    next unless entry.start_with?('URI:spiffe://')

    uri = entry.sub('URI:', '')
    return Legion::Crypt::Spiffe.parse_id(uri)
  rescue InvalidSpiffeIdError => e
    handle_exception(e, level: :debug, operation: 'crypt.spiffe.extract_spiffe_id_from_cert', san_entry: entry)
    next
  end

  nil
rescue OpenSSL::X509::CertificateError => e
  handle_exception(e, level: :debug, operation: 'crypt.spiffe.extract_spiffe_id_from_cert')
  nil
end

#sign_with_svid(data, svid:) ⇒ String

Sign arbitrary data with the private key from an X.509 SVID. Returns the signature as a Base64-encoded string.

Parameters:

  • data (String)

    The bytes to sign (any encoding; treated as binary).

  • svid (X509Svid)

    An X509Svid whose key_pem is populated.

Returns:

  • (String)

    Base64-encoded DER signature.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 24

def sign_with_svid(data, svid:)
  raise SvidError, 'Cannot sign: SVID is nil' if svid.nil?
  raise SvidError, "Cannot sign: SVID '#{svid.spiffe_id}' has expired" if svid.expired?
  raise SvidError, 'Cannot sign: SVID private key is missing' if svid.key_pem.nil?

  key       = OpenSSL::PKey.read(svid.key_pem)
  digest    = OpenSSL::Digest.new('SHA256')
  signature = key.sign(digest, data.b)
  log.debug("SPIFFE signed payload with SVID id=#{svid.spiffe_id}")
  Base64.strict_encode64(signature)
rescue StandardError => e
  handle_exception(e, level: :error, operation: 'crypt.spiffe.sign_with_svid', spiffe_id: svid&.spiffe_id&.to_s)
  raise
end

#svid_identity(svid) ⇒ Hash

Return a hash of identity information extracted from an SVID.

Parameters:

Returns:

  • (Hash)


114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 114

def svid_identity(svid)
  return {} if svid.nil?

  base = {
    spiffe_id:     svid.spiffe_id.to_s,
    trust_domain:  svid.spiffe_id.trust_domain,
    workload_path: svid.spiffe_id.path,
    expiry:        svid.expiry,
    expired:       svid.expired?
  }

  case svid
  when X509Svid
    base.merge(type: :x509, ttl_seconds: svid.ttl.to_i)
  when JwtSvid
    base.merge(type: :jwt, audience: svid.audience)
  else
    base
  end
end

#trusted_cert?(cert_pem, svid:) ⇒ Boolean

Validate that a certificate chain is trusted by the bundle embedded in the given SVID. Returns true if the leaf cert chains up to the bundle CA, false otherwise.

Parameters:

  • cert_pem (String)

    PEM-encoded leaf certificate to validate.

  • svid (X509Svid)

    SVID whose bundle_pem contains the trust anchor.

Returns:

  • (Boolean)


96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 96

def trusted_cert?(cert_pem, svid:)
  raise SvidError, 'Cannot check trust: SVID is nil' if svid.nil?
  return false if svid.bundle_pem.nil?

  store = OpenSSL::X509::Store.new
  store.add_cert(OpenSSL::X509::Certificate.new(svid.bundle_pem))

  leaf = OpenSSL::X509::Certificate.new(cert_pem)
  store.verify(leaf)
rescue OpenSSL::X509::CertificateError, OpenSSL::X509::StoreError => e
  handle_exception(e, level: :warn, operation: 'crypt.spiffe.trusted_cert?', spiffe_id: svid&.spiffe_id&.to_s)
  false
end

#verify_svid_signature(data, signature_b64:, svid:) ⇒ Boolean

Verify a Base64-encoded signature produced by sign_with_svid.

Parameters:

  • data (String)

    Original data that was signed.

  • signature_b64 (String)

    Base64-encoded signature from sign_with_svid.

  • svid (X509Svid)

    The SVID whose public certificate is used for verification.

Returns:

  • (Boolean)

    true if the signature is valid.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/legion/crypt/spiffe/identity_helpers.rb', line 45

def verify_svid_signature(data, signature_b64:, svid:)
  raise SvidError, 'Cannot verify: SVID is nil' if svid.nil?
  raise SvidError, "Cannot verify: SVID '#{svid.spiffe_id}' has expired" if svid.expired?
  raise SvidError, 'Cannot verify: SVID certificate is missing' if svid.cert_pem.nil?

  cert      = OpenSSL::X509::Certificate.new(svid.cert_pem)
  digest    = OpenSSL::Digest.new('SHA256')
  signature = Base64.strict_decode64(signature_b64)
  result = cert.public_key.verify(digest, signature, data.b)
  log.debug("SPIFFE signature verification completed id=#{svid.spiffe_id} valid=#{result}")
  result
rescue OpenSSL::PKey::PKeyError, OpenSSL::X509::CertificateError, ArgumentError => e
  handle_exception(e, level: :warn, operation: 'crypt.spiffe.verify_svid_signature', spiffe_id: svid&.spiffe_id&.to_s)
  log.warn("[SPIFFE] SVID signature verification error: #{e.message}")
  false
end