Module: PQCrypto::JWT::JWA::MLDSAStreaming
Constant Summary collapse
- DEFAULT_CHUNK_SIZE =
1 << 20
- STREAMING_DECODE_ERRORS =
[JSON::ParserError, ArgumentError, TypeError, EncodingError].freeze
- STREAMING_VERIFY_ERRORS =
[PQCrypto::Error, PQCrypto::JWT::Error].freeze
Instance Method Summary collapse
- #sign_io(signing_key:, payload_io: nil, io: nil, header_fields: {}, chunk_size: DEFAULT_CHUNK_SIZE) ⇒ Object
- #streaming_supported? ⇒ Boolean
- #verify_io(verification_key:, token:, payload_io:, chunk_size: DEFAULT_CHUNK_SIZE, strict: false) ⇒ Object
- #verify_io!(**kwargs) ⇒ Object
Instance Method Details
#sign_io(signing_key:, payload_io: nil, io: nil, header_fields: {}, chunk_size: DEFAULT_CHUNK_SIZE) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/pq_crypto/jwt/jwa/ml_dsa_streaming.rb', line 20 def sign_io(signing_key:, payload_io: nil, io: nil, header_fields: {}, chunk_size: DEFAULT_CHUNK_SIZE) ensure_streaming! ensure_key!(signing_key, PQCrypto::Signature::SecretKey, "signing") validate_chunk_size!(chunk_size) source = payload_io || io raise ArgumentError, "payload_io must respond to #read" unless source.respond_to?(:read) header = normalize_signing_header!(header_fields).merge("alg" => alg) encoded_header = base64url(JSON.generate(header)) input = DetachedSigningInputIO.new(encoded_header, source, chunk_size: chunk_size) signature = signing_key.sign_io(input, chunk_size: chunk_size, context: EMPTY_CONTEXT) "#{encoded_header}..#{base64url(signature)}" rescue PQCrypto::JWT::Error, ArgumentError raise rescue StandardError => e raise ::JWT::EncodeError, e. end |
#streaming_supported? ⇒ Boolean
14 15 16 17 18 |
# File 'lib/pq_crypto/jwt/jwa/ml_dsa_streaming.rb', line 14 def streaming_supported? PQCrypto::Signature.supported.include?(pq_crypto_algorithm) rescue PQCrypto::Error false end |
#verify_io(verification_key:, token:, payload_io:, chunk_size: DEFAULT_CHUNK_SIZE, strict: false) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/pq_crypto/jwt/jwa/ml_dsa_streaming.rb', line 38 def verify_io(verification_key:, token:, payload_io:, chunk_size: DEFAULT_CHUNK_SIZE, strict: false) ensure_streaming! ensure_key!(verification_key, PQCrypto::Signature::PublicKey, "verification") validate_chunk_size!(chunk_size) return streaming_decode_failure!(strict, "token must be a String") unless token.is_a?(String) return streaming_decode_failure!(strict, "payload_io must respond to #read") unless payload_io.respond_to?(:read) encoded_header, encoded_payload, encoded_signature, extra = token.split(".", -1) unless extra.nil? && encoded_payload == "" && encoded_signature return streaming_decode_failure!(strict, "compact detached JWS must have exactly three segments and an empty payload segment") end header = JSON.parse(Base64.urlsafe_decode64(encoded_header)) return streaming_decode_failure!(strict, "unsupported streaming JWS protected header") unless supported_streaming_header?(header) signature = Base64.urlsafe_decode64(encoded_signature) return streaming_verify_failure!(strict, "invalid #{alg} signature length") unless signature_length_valid?(signature) input = DetachedSigningInputIO.new(encoded_header, payload_io, chunk_size: chunk_size) return streaming_verify_failure!(strict, "Streaming JWS verification failed") unless verification_key.verify_io(input, signature, chunk_size: chunk_size, context: EMPTY_CONTEXT) [payload_io.respond_to?(:pos) ? payload_io.pos : nil, header] rescue *STREAMING_DECODE_ERRORS => e streaming_decode_failure!(strict, e.) rescue *STREAMING_VERIFY_ERRORS => e streaming_verify_failure!(strict, e.) end |
#verify_io!(**kwargs) ⇒ Object
66 67 68 69 |
# File 'lib/pq_crypto/jwt/jwa/ml_dsa_streaming.rb', line 66 def verify_io!(**kwargs) verify_io(**kwargs) || raise(::JWT::VerificationError, "Streaming JWS verification failed") true end |