moving to scripts

This commit is contained in:
eneller
2021-11-16 23:55:48 +01:00
parent f591ca2077
commit 14bfb7f96f
2575 changed files with 465862 additions and 0 deletions

View File

@@ -0,0 +1,27 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
import typing
from cryptography.hazmat.backends.interfaces import Backend
_default_backend: typing.Optional[Backend] = None
def default_backend() -> Backend:
global _default_backend
if _default_backend is None:
from cryptography.hazmat.backends.openssl.backend import backend
_default_backend = backend
return _default_backend
def _get_backend(backend: typing.Optional[Backend]) -> Backend:
if backend is None:
return default_backend()
else:
return backend

View File

@@ -0,0 +1,449 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
import abc
import typing
if typing.TYPE_CHECKING:
from cryptography.hazmat.primitives.asymmetric.types import (
PRIVATE_KEY_TYPES,
)
from cryptography.hazmat.primitives import hashes
from cryptography.x509.base import (
Certificate,
CertificateBuilder,
CertificateRevocationList,
CertificateRevocationListBuilder,
CertificateSigningRequest,
CertificateSigningRequestBuilder,
RevokedCertificate,
RevokedCertificateBuilder,
)
from cryptography.x509.name import Name
class CipherBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def cipher_supported(self, cipher, mode):
"""
Return True if the given cipher and mode are supported.
"""
@abc.abstractmethod
def create_symmetric_encryption_ctx(self, cipher, mode):
"""
Get a CipherContext that can be used for encryption.
"""
@abc.abstractmethod
def create_symmetric_decryption_ctx(self, cipher, mode):
"""
Get a CipherContext that can be used for decryption.
"""
class HashBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def hash_supported(self, algorithm):
"""
Return True if the hash algorithm is supported by this backend.
"""
@abc.abstractmethod
def create_hash_ctx(self, algorithm):
"""
Create a HashContext for calculating a message digest.
"""
class HMACBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def hmac_supported(self, algorithm):
"""
Return True if the hash algorithm is supported for HMAC by this
backend.
"""
@abc.abstractmethod
def create_hmac_ctx(self, key, algorithm):
"""
Create a context for calculating a message authentication code.
"""
class CMACBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def cmac_algorithm_supported(self, algorithm):
"""
Returns True if the block cipher is supported for CMAC by this backend
"""
@abc.abstractmethod
def create_cmac_ctx(self, algorithm):
"""
Create a context for calculating a message authentication code.
"""
class PBKDF2HMACBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def pbkdf2_hmac_supported(self, algorithm):
"""
Return True if the hash algorithm is supported for PBKDF2 by this
backend.
"""
@abc.abstractmethod
def derive_pbkdf2_hmac(
self, algorithm, length, salt, iterations, key_material
):
"""
Return length bytes derived from provided PBKDF2 parameters.
"""
class RSABackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def generate_rsa_private_key(self, public_exponent, key_size):
"""
Generate an RSAPrivateKey instance with public_exponent and a modulus
of key_size bits.
"""
@abc.abstractmethod
def rsa_padding_supported(self, padding):
"""
Returns True if the backend supports the given padding options.
"""
@abc.abstractmethod
def generate_rsa_parameters_supported(self, public_exponent, key_size):
"""
Returns True if the backend supports the given parameters for key
generation.
"""
@abc.abstractmethod
def load_rsa_private_numbers(self, numbers):
"""
Returns an RSAPrivateKey provider.
"""
@abc.abstractmethod
def load_rsa_public_numbers(self, numbers):
"""
Returns an RSAPublicKey provider.
"""
class DSABackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def generate_dsa_parameters(self, key_size):
"""
Generate a DSAParameters instance with a modulus of key_size bits.
"""
@abc.abstractmethod
def generate_dsa_private_key(self, parameters):
"""
Generate a DSAPrivateKey instance with parameters as a DSAParameters
object.
"""
@abc.abstractmethod
def generate_dsa_private_key_and_parameters(self, key_size):
"""
Generate a DSAPrivateKey instance using key size only.
"""
@abc.abstractmethod
def dsa_hash_supported(self, algorithm):
"""
Return True if the hash algorithm is supported by the backend for DSA.
"""
@abc.abstractmethod
def dsa_parameters_supported(self, p, q, g):
"""
Return True if the parameters are supported by the backend for DSA.
"""
@abc.abstractmethod
def load_dsa_private_numbers(self, numbers):
"""
Returns a DSAPrivateKey provider.
"""
@abc.abstractmethod
def load_dsa_public_numbers(self, numbers):
"""
Returns a DSAPublicKey provider.
"""
@abc.abstractmethod
def load_dsa_parameter_numbers(self, numbers):
"""
Returns a DSAParameters provider.
"""
class EllipticCurveBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def elliptic_curve_signature_algorithm_supported(
self, signature_algorithm, curve
):
"""
Returns True if the backend supports the named elliptic curve with the
specified signature algorithm.
"""
@abc.abstractmethod
def elliptic_curve_supported(self, curve):
"""
Returns True if the backend supports the named elliptic curve.
"""
@abc.abstractmethod
def generate_elliptic_curve_private_key(self, curve):
"""
Return an object conforming to the EllipticCurvePrivateKey interface.
"""
@abc.abstractmethod
def load_elliptic_curve_public_numbers(self, numbers):
"""
Return an EllipticCurvePublicKey provider using the given numbers.
"""
@abc.abstractmethod
def load_elliptic_curve_private_numbers(self, numbers):
"""
Return an EllipticCurvePrivateKey provider using the given numbers.
"""
@abc.abstractmethod
def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve):
"""
Returns whether the exchange algorithm is supported by this backend.
"""
@abc.abstractmethod
def derive_elliptic_curve_private_key(self, private_value, curve):
"""
Compute the private key given the private value and curve.
"""
class PEMSerializationBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def load_pem_private_key(self, data, password):
"""
Loads a private key from PEM encoded data, using the provided password
if the data is encrypted.
"""
@abc.abstractmethod
def load_pem_public_key(self, data):
"""
Loads a public key from PEM encoded data.
"""
@abc.abstractmethod
def load_pem_parameters(self, data):
"""
Load encryption parameters from PEM encoded data.
"""
class DERSerializationBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def load_der_private_key(self, data, password):
"""
Loads a private key from DER encoded data. Uses the provided password
if the data is encrypted.
"""
@abc.abstractmethod
def load_der_public_key(self, data):
"""
Loads a public key from DER encoded data.
"""
@abc.abstractmethod
def load_der_parameters(self, data):
"""
Load encryption parameters from DER encoded data.
"""
class X509Backend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def create_x509_csr(
self,
builder: "CertificateSigningRequestBuilder",
private_key: "PRIVATE_KEY_TYPES",
algorithm: typing.Optional["hashes.HashAlgorithm"],
) -> "CertificateSigningRequest":
"""
Create and sign an X.509 CSR from a CSR builder object.
"""
@abc.abstractmethod
def create_x509_certificate(
self,
builder: "CertificateBuilder",
private_key: "PRIVATE_KEY_TYPES",
algorithm: typing.Optional["hashes.HashAlgorithm"],
) -> "Certificate":
"""
Create and sign an X.509 certificate from a CertificateBuilder object.
"""
@abc.abstractmethod
def create_x509_crl(
self,
builder: "CertificateRevocationListBuilder",
private_key: "PRIVATE_KEY_TYPES",
algorithm: typing.Optional["hashes.HashAlgorithm"],
) -> "CertificateRevocationList":
"""
Create and sign an X.509 CertificateRevocationList from a
CertificateRevocationListBuilder object.
"""
@abc.abstractmethod
def create_x509_revoked_certificate(
self, builder: "RevokedCertificateBuilder"
) -> "RevokedCertificate":
"""
Create a RevokedCertificate object from a RevokedCertificateBuilder
object.
"""
@abc.abstractmethod
def x509_name_bytes(self, name: "Name") -> bytes:
"""
Compute the DER encoded bytes of an X509 Name object.
"""
class DHBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def generate_dh_parameters(self, generator, key_size):
"""
Generate a DHParameters instance with a modulus of key_size bits.
Using the given generator. Often 2 or 5.
"""
@abc.abstractmethod
def generate_dh_private_key(self, parameters):
"""
Generate a DHPrivateKey instance with parameters as a DHParameters
object.
"""
@abc.abstractmethod
def generate_dh_private_key_and_parameters(self, generator, key_size):
"""
Generate a DHPrivateKey instance using key size only.
Using the given generator. Often 2 or 5.
"""
@abc.abstractmethod
def load_dh_private_numbers(self, numbers):
"""
Load a DHPrivateKey from DHPrivateNumbers
"""
@abc.abstractmethod
def load_dh_public_numbers(self, numbers):
"""
Load a DHPublicKey from DHPublicNumbers.
"""
@abc.abstractmethod
def load_dh_parameter_numbers(self, numbers):
"""
Load DHParameters from DHParameterNumbers.
"""
@abc.abstractmethod
def dh_parameters_supported(self, p, g, q=None):
"""
Returns whether the backend supports DH with these parameter values.
"""
@abc.abstractmethod
def dh_x942_serialization_supported(self):
"""
Returns True if the backend supports the serialization of DH objects
with subgroup order (q).
"""
class ScryptBackend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def derive_scrypt(self, key_material, salt, length, n, r, p):
"""
Return bytes derived from provided Scrypt parameters.
"""
@abc.abstractmethod
def scrypt_supported(self):
"""
Return True if Scrypt is supported.
"""
# This is the catch-all for future backend methods and inherits all the
# other interfaces as well so we can just use Backend for typing.
class Backend(
CipherBackend,
CMACBackend,
DERSerializationBackend,
DHBackend,
DSABackend,
EllipticCurveBackend,
HashBackend,
HMACBackend,
PBKDF2HMACBackend,
RSABackend,
PEMSerializationBackend,
ScryptBackend,
X509Backend,
metaclass=abc.ABCMeta,
):
@abc.abstractmethod
def load_pem_pkcs7_certificates(self, data):
"""
Returns a list of x509.Certificate
"""
@abc.abstractmethod
def load_der_pkcs7_certificates(self, data):
"""
Returns a list of x509.Certificate
"""
@abc.abstractmethod
def pkcs7_sign(self, builder, encoding, options):
"""
Returns bytes
"""
@abc.abstractmethod
def load_key_and_certificates_from_pkcs12(self, data, password):
"""
Returns a tuple of (key, cert, [certs])
"""
@abc.abstractmethod
def serialize_key_and_certificates_to_pkcs12(
self, name, key, cert, cas, encryption_algorithm
):
"""
Returns bytes
"""

View File

@@ -0,0 +1,9 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography.hazmat.backends.openssl.backend import backend
__all__ = ["backend"]

View File

@@ -0,0 +1,165 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography.exceptions import InvalidTag
_ENCRYPT = 1
_DECRYPT = 0
def _aead_cipher_name(cipher):
from cryptography.hazmat.primitives.ciphers.aead import (
AESCCM,
AESGCM,
ChaCha20Poly1305,
)
if isinstance(cipher, ChaCha20Poly1305):
return b"chacha20-poly1305"
elif isinstance(cipher, AESCCM):
return "aes-{}-ccm".format(len(cipher._key) * 8).encode("ascii")
else:
assert isinstance(cipher, AESGCM)
return "aes-{}-gcm".format(len(cipher._key) * 8).encode("ascii")
def _aead_setup(backend, cipher_name, key, nonce, tag, tag_len, operation):
evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name)
backend.openssl_assert(evp_cipher != backend._ffi.NULL)
ctx = backend._lib.EVP_CIPHER_CTX_new()
ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
res = backend._lib.EVP_CipherInit_ex(
ctx,
evp_cipher,
backend._ffi.NULL,
backend._ffi.NULL,
backend._ffi.NULL,
int(operation == _ENCRYPT),
)
backend.openssl_assert(res != 0)
res = backend._lib.EVP_CIPHER_CTX_set_key_length(ctx, len(key))
backend.openssl_assert(res != 0)
res = backend._lib.EVP_CIPHER_CTX_ctrl(
ctx,
backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
len(nonce),
backend._ffi.NULL,
)
backend.openssl_assert(res != 0)
if operation == _DECRYPT:
res = backend._lib.EVP_CIPHER_CTX_ctrl(
ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
)
backend.openssl_assert(res != 0)
elif cipher_name.endswith(b"-ccm"):
res = backend._lib.EVP_CIPHER_CTX_ctrl(
ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, tag_len, backend._ffi.NULL
)
backend.openssl_assert(res != 0)
nonce_ptr = backend._ffi.from_buffer(nonce)
key_ptr = backend._ffi.from_buffer(key)
res = backend._lib.EVP_CipherInit_ex(
ctx,
backend._ffi.NULL,
backend._ffi.NULL,
key_ptr,
nonce_ptr,
int(operation == _ENCRYPT),
)
backend.openssl_assert(res != 0)
return ctx
def _set_length(backend, ctx, data_len):
intptr = backend._ffi.new("int *")
res = backend._lib.EVP_CipherUpdate(
ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len
)
backend.openssl_assert(res != 0)
def _process_aad(backend, ctx, associated_data):
outlen = backend._ffi.new("int *")
res = backend._lib.EVP_CipherUpdate(
ctx, backend._ffi.NULL, outlen, associated_data, len(associated_data)
)
backend.openssl_assert(res != 0)
def _process_data(backend, ctx, data):
outlen = backend._ffi.new("int *")
buf = backend._ffi.new("unsigned char[]", len(data))
res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data))
backend.openssl_assert(res != 0)
return backend._ffi.buffer(buf, outlen[0])[:]
def _encrypt(backend, cipher, nonce, data, associated_data, tag_length):
from cryptography.hazmat.primitives.ciphers.aead import AESCCM
cipher_name = _aead_cipher_name(cipher)
ctx = _aead_setup(
backend, cipher_name, cipher._key, nonce, None, tag_length, _ENCRYPT
)
# CCM requires us to pass the length of the data before processing anything
# However calling this with any other AEAD results in an error
if isinstance(cipher, AESCCM):
_set_length(backend, ctx, len(data))
_process_aad(backend, ctx, associated_data)
processed_data = _process_data(backend, ctx, data)
outlen = backend._ffi.new("int *")
res = backend._lib.EVP_CipherFinal_ex(ctx, backend._ffi.NULL, outlen)
backend.openssl_assert(res != 0)
backend.openssl_assert(outlen[0] == 0)
tag_buf = backend._ffi.new("unsigned char[]", tag_length)
res = backend._lib.EVP_CIPHER_CTX_ctrl(
ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf
)
backend.openssl_assert(res != 0)
tag = backend._ffi.buffer(tag_buf)[:]
return processed_data + tag
def _decrypt(backend, cipher, nonce, data, associated_data, tag_length):
from cryptography.hazmat.primitives.ciphers.aead import AESCCM
if len(data) < tag_length:
raise InvalidTag
tag = data[-tag_length:]
data = data[:-tag_length]
cipher_name = _aead_cipher_name(cipher)
ctx = _aead_setup(
backend, cipher_name, cipher._key, nonce, tag, tag_length, _DECRYPT
)
# CCM requires us to pass the length of the data before processing anything
# However calling this with any other AEAD results in an error
if isinstance(cipher, AESCCM):
_set_length(backend, ctx, len(data))
_process_aad(backend, ctx, associated_data)
# CCM has a different error path if the tag doesn't match. Errors are
# raised in Update and Final is irrelevant.
if isinstance(cipher, AESCCM):
outlen = backend._ffi.new("int *")
buf = backend._ffi.new("unsigned char[]", len(data))
res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data))
if res != 1:
backend._consume_errors()
raise InvalidTag
processed_data = backend._ffi.buffer(buf, outlen[0])[:]
else:
processed_data = _process_data(backend, ctx, data)
outlen = backend._ffi.new("int *")
res = backend._lib.EVP_CipherFinal_ex(ctx, backend._ffi.NULL, outlen)
if res == 0:
backend._consume_errors()
raise InvalidTag
return processed_data

View File

@@ -0,0 +1,272 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography import utils
from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.primitives import ciphers
from cryptography.hazmat.primitives.ciphers import modes
@utils.register_interface(ciphers.CipherContext)
@utils.register_interface(ciphers.AEADCipherContext)
@utils.register_interface(ciphers.AEADEncryptionContext)
@utils.register_interface(ciphers.AEADDecryptionContext)
class _CipherContext(object):
_ENCRYPT = 1
_DECRYPT = 0
_MAX_CHUNK_SIZE = 2 ** 30 - 1
def __init__(self, backend, cipher, mode, operation):
self._backend = backend
self._cipher = cipher
self._mode = mode
self._operation = operation
self._tag = None
if isinstance(self._cipher, ciphers.BlockCipherAlgorithm):
self._block_size_bytes = self._cipher.block_size // 8
else:
self._block_size_bytes = 1
ctx = self._backend._lib.EVP_CIPHER_CTX_new()
ctx = self._backend._ffi.gc(
ctx, self._backend._lib.EVP_CIPHER_CTX_free
)
registry = self._backend._cipher_registry
try:
adapter = registry[type(cipher), type(mode)]
except KeyError:
raise UnsupportedAlgorithm(
"cipher {} in {} mode is not supported "
"by this backend.".format(
cipher.name, mode.name if mode else mode
),
_Reasons.UNSUPPORTED_CIPHER,
)
evp_cipher = adapter(self._backend, cipher, mode)
if evp_cipher == self._backend._ffi.NULL:
msg = "cipher {0.name} ".format(cipher)
if mode is not None:
msg += "in {0.name} mode ".format(mode)
msg += (
"is not supported by this backend (Your version of OpenSSL "
"may be too old. Current version: {}.)"
).format(self._backend.openssl_version_text())
raise UnsupportedAlgorithm(msg, _Reasons.UNSUPPORTED_CIPHER)
if isinstance(mode, modes.ModeWithInitializationVector):
iv_nonce = self._backend._ffi.from_buffer(
mode.initialization_vector
)
elif isinstance(mode, modes.ModeWithTweak):
iv_nonce = self._backend._ffi.from_buffer(mode.tweak)
elif isinstance(mode, modes.ModeWithNonce):
iv_nonce = self._backend._ffi.from_buffer(mode.nonce)
elif isinstance(cipher, modes.ModeWithNonce):
iv_nonce = self._backend._ffi.from_buffer(cipher.nonce)
else:
iv_nonce = self._backend._ffi.NULL
# begin init with cipher and operation type
res = self._backend._lib.EVP_CipherInit_ex(
ctx,
evp_cipher,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
operation,
)
self._backend.openssl_assert(res != 0)
# set the key length to handle variable key ciphers
res = self._backend._lib.EVP_CIPHER_CTX_set_key_length(
ctx, len(cipher.key)
)
self._backend.openssl_assert(res != 0)
if isinstance(mode, modes.GCM):
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
ctx,
self._backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
len(iv_nonce),
self._backend._ffi.NULL,
)
self._backend.openssl_assert(res != 0)
if mode.tag is not None:
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
ctx,
self._backend._lib.EVP_CTRL_AEAD_SET_TAG,
len(mode.tag),
mode.tag,
)
self._backend.openssl_assert(res != 0)
self._tag = mode.tag
# pass key/iv
res = self._backend._lib.EVP_CipherInit_ex(
ctx,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._backend._ffi.from_buffer(cipher.key),
iv_nonce,
operation,
)
# Check for XTS mode duplicate keys error
errors = self._backend._consume_errors()
lib = self._backend._lib
if res == 0 and (
(
lib.CRYPTOGRAPHY_OPENSSL_111D_OR_GREATER
and errors[0]._lib_reason_match(
lib.ERR_LIB_EVP, lib.EVP_R_XTS_DUPLICATED_KEYS
)
)
or (
lib.Cryptography_HAS_PROVIDERS
and errors[0]._lib_reason_match(
lib.ERR_LIB_PROV, lib.PROV_R_XTS_DUPLICATED_KEYS
)
)
):
raise ValueError("In XTS mode duplicated keys are not allowed")
self._backend.openssl_assert(res != 0, errors=errors)
# We purposely disable padding here as it's handled higher up in the
# API.
self._backend._lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
self._ctx = ctx
def update(self, data: bytes) -> bytes:
buf = bytearray(len(data) + self._block_size_bytes - 1)
n = self.update_into(data, buf)
return bytes(buf[:n])
def update_into(self, data: bytes, buf) -> int:
total_data_len = len(data)
if len(buf) < (total_data_len + self._block_size_bytes - 1):
raise ValueError(
"buffer must be at least {} bytes for this "
"payload".format(len(data) + self._block_size_bytes - 1)
)
data_processed = 0
total_out = 0
outlen = self._backend._ffi.new("int *")
baseoutbuf = self._backend._ffi.from_buffer(buf)
baseinbuf = self._backend._ffi.from_buffer(data)
while data_processed != total_data_len:
outbuf = baseoutbuf + total_out
inbuf = baseinbuf + data_processed
inlen = min(self._MAX_CHUNK_SIZE, total_data_len - data_processed)
res = self._backend._lib.EVP_CipherUpdate(
self._ctx, outbuf, outlen, inbuf, inlen
)
if res == 0 and isinstance(self._mode, modes.XTS):
self._backend._consume_errors()
raise ValueError(
"In XTS mode you must supply at least a full block in the "
"first update call. For AES this is 16 bytes."
)
else:
self._backend.openssl_assert(res != 0)
data_processed += inlen
total_out += outlen[0]
return total_out
def finalize(self) -> bytes:
if (
self._operation == self._DECRYPT
and isinstance(self._mode, modes.ModeWithAuthenticationTag)
and self.tag is None
):
raise ValueError(
"Authentication tag must be provided when decrypting."
)
buf = self._backend._ffi.new("unsigned char[]", self._block_size_bytes)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
if res == 0:
errors = self._backend._consume_errors()
if not errors and isinstance(self._mode, modes.GCM):
raise InvalidTag
self._backend.openssl_assert(
errors[0]._lib_reason_match(
self._backend._lib.ERR_LIB_EVP,
self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH,
)
or (
self._backend._lib.Cryptography_HAS_PROVIDERS
and errors[0]._lib_reason_match(
self._backend._lib.ERR_LIB_PROV,
self._backend._lib.PROV_R_WRONG_FINAL_BLOCK_LENGTH,
)
),
errors=errors,
)
raise ValueError(
"The length of the provided data is not a multiple of "
"the block length."
)
if (
isinstance(self._mode, modes.GCM)
and self._operation == self._ENCRYPT
):
tag_buf = self._backend._ffi.new(
"unsigned char[]", self._block_size_bytes
)
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
self._ctx,
self._backend._lib.EVP_CTRL_AEAD_GET_TAG,
self._block_size_bytes,
tag_buf,
)
self._backend.openssl_assert(res != 0)
self._tag = self._backend._ffi.buffer(tag_buf)[:]
res = self._backend._lib.EVP_CIPHER_CTX_reset(self._ctx)
self._backend.openssl_assert(res == 1)
return self._backend._ffi.buffer(buf)[: outlen[0]]
def finalize_with_tag(self, tag: bytes) -> bytes:
tag_len = len(tag)
if tag_len < self._mode._min_tag_length:
raise ValueError(
"Authentication tag must be {} bytes or longer.".format(
self._mode._min_tag_length
)
)
elif tag_len > self._block_size_bytes:
raise ValueError(
"Authentication tag cannot be more than {} bytes.".format(
self._block_size_bytes
)
)
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
self._ctx, self._backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
)
self._backend.openssl_assert(res != 0)
self._tag = tag
return self.finalize()
def authenticate_additional_data(self, data: bytes) -> None:
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(
self._ctx,
self._backend._ffi.NULL,
outlen,
self._backend._ffi.from_buffer(data),
len(data),
)
self._backend.openssl_assert(res != 0)
tag = utils.read_only_property("_tag")

View File

@@ -0,0 +1,80 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography import utils
from cryptography.exceptions import (
InvalidSignature,
UnsupportedAlgorithm,
_Reasons,
)
from cryptography.hazmat.primitives import constant_time
from cryptography.hazmat.primitives.ciphers.modes import CBC
class _CMACContext(object):
def __init__(self, backend, algorithm, ctx=None):
if not backend.cmac_algorithm_supported(algorithm):
raise UnsupportedAlgorithm(
"This backend does not support CMAC.",
_Reasons.UNSUPPORTED_CIPHER,
)
self._backend = backend
self._key = algorithm.key
self._algorithm = algorithm
self._output_length = algorithm.block_size // 8
if ctx is None:
registry = self._backend._cipher_registry
adapter = registry[type(algorithm), CBC]
evp_cipher = adapter(self._backend, algorithm, CBC)
ctx = self._backend._lib.CMAC_CTX_new()
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
ctx = self._backend._ffi.gc(ctx, self._backend._lib.CMAC_CTX_free)
key_ptr = self._backend._ffi.from_buffer(self._key)
res = self._backend._lib.CMAC_Init(
ctx,
key_ptr,
len(self._key),
evp_cipher,
self._backend._ffi.NULL,
)
self._backend.openssl_assert(res == 1)
self._ctx = ctx
algorithm = utils.read_only_property("_algorithm")
def update(self, data: bytes) -> None:
res = self._backend._lib.CMAC_Update(self._ctx, data, len(data))
self._backend.openssl_assert(res == 1)
def finalize(self) -> bytes:
buf = self._backend._ffi.new("unsigned char[]", self._output_length)
length = self._backend._ffi.new("size_t *", self._output_length)
res = self._backend._lib.CMAC_Final(self._ctx, buf, length)
self._backend.openssl_assert(res == 1)
self._ctx = None
return self._backend._ffi.buffer(buf)[:]
def copy(self) -> "_CMACContext":
copied_ctx = self._backend._lib.CMAC_CTX_new()
copied_ctx = self._backend._ffi.gc(
copied_ctx, self._backend._lib.CMAC_CTX_free
)
res = self._backend._lib.CMAC_CTX_copy(copied_ctx, self._ctx)
self._backend.openssl_assert(res == 1)
return _CMACContext(self._backend, self._algorithm, ctx=copied_ctx)
def verify(self, signature: bytes) -> None:
digest = self.finalize()
if not constant_time.bytes_eq(digest, signature):
raise InvalidSignature("Signature did not match digest.")

View File

@@ -0,0 +1,34 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography import x509
_DISTPOINT_TYPE_FULLNAME = 0
_DISTPOINT_TYPE_RELATIVENAME = 1
# CRLReason ::= ENUMERATED {
# unspecified (0),
# keyCompromise (1),
# cACompromise (2),
# affiliationChanged (3),
# superseded (4),
# cessationOfOperation (5),
# certificateHold (6),
# -- value 7 is not used
# removeFromCRL (8),
# privilegeWithdrawn (9),
# aACompromise (10) }
_CRL_ENTRY_REASON_ENUM_TO_CODE = {
x509.ReasonFlags.unspecified: 0,
x509.ReasonFlags.key_compromise: 1,
x509.ReasonFlags.ca_compromise: 2,
x509.ReasonFlags.affiliation_changed: 3,
x509.ReasonFlags.superseded: 4,
x509.ReasonFlags.cessation_of_operation: 5,
x509.ReasonFlags.certificate_hold: 6,
x509.ReasonFlags.remove_from_crl: 8,
x509.ReasonFlags.privilege_withdrawn: 9,
x509.ReasonFlags.aa_compromise: 10,
}

View File

@@ -0,0 +1,293 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dh
def _dh_params_dup(dh_cdata, backend):
lib = backend._lib
ffi = backend._ffi
param_cdata = lib.DHparams_dup(dh_cdata)
backend.openssl_assert(param_cdata != ffi.NULL)
param_cdata = ffi.gc(param_cdata, lib.DH_free)
if lib.CRYPTOGRAPHY_IS_LIBRESSL:
# In libressl DHparams_dup don't copy q
q = ffi.new("BIGNUM **")
lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
q_dup = lib.BN_dup(q[0])
res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
backend.openssl_assert(res == 1)
return param_cdata
def _dh_cdata_to_parameters(dh_cdata, backend):
param_cdata = _dh_params_dup(dh_cdata, backend)
return _DHParameters(backend, param_cdata)
class _DHParameters(dh.DHParameters):
def __init__(self, backend, dh_cdata):
self._backend = backend
self._dh_cdata = dh_cdata
def parameter_numbers(self) -> dh.DHParameterNumbers:
p = self._backend._ffi.new("BIGNUM **")
g = self._backend._ffi.new("BIGNUM **")
q = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
if q[0] == self._backend._ffi.NULL:
q_val = None
else:
q_val = self._backend._bn_to_int(q[0])
return dh.DHParameterNumbers(
p=self._backend._bn_to_int(p[0]),
g=self._backend._bn_to_int(g[0]),
q=q_val,
)
def generate_private_key(self) -> dh.DHPrivateKey:
return self._backend.generate_dh_private_key(self)
def parameter_bytes(
self,
encoding: serialization.Encoding,
format: serialization.ParameterFormat,
) -> bytes:
if format is not serialization.ParameterFormat.PKCS3:
raise ValueError("Only PKCS3 serialization is supported")
if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
q = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DH_get0_pqg(
self._dh_cdata,
self._backend._ffi.NULL,
q,
self._backend._ffi.NULL,
)
if q[0] != self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"DH X9.42 serialization is not supported",
_Reasons.UNSUPPORTED_SERIALIZATION,
)
return self._backend._parameter_bytes(encoding, format, self._dh_cdata)
def _get_dh_num_bits(backend, dh_cdata) -> int:
p = backend._ffi.new("BIGNUM **")
backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL)
backend.openssl_assert(p[0] != backend._ffi.NULL)
return backend._lib.BN_num_bits(p[0])
class _DHPrivateKey(dh.DHPrivateKey):
def __init__(self, backend, dh_cdata, evp_pkey):
self._backend = backend
self._dh_cdata = dh_cdata
self._evp_pkey = evp_pkey
self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
@property
def key_size(self) -> int:
return _get_dh_num_bits(self._backend, self._dh_cdata)
def private_numbers(self) -> dh.DHPrivateNumbers:
p = self._backend._ffi.new("BIGNUM **")
g = self._backend._ffi.new("BIGNUM **")
q = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
if q[0] == self._backend._ffi.NULL:
q_val = None
else:
q_val = self._backend._bn_to_int(q[0])
pub_key = self._backend._ffi.new("BIGNUM **")
priv_key = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
return dh.DHPrivateNumbers(
public_numbers=dh.DHPublicNumbers(
parameter_numbers=dh.DHParameterNumbers(
p=self._backend._bn_to_int(p[0]),
g=self._backend._bn_to_int(g[0]),
q=q_val,
),
y=self._backend._bn_to_int(pub_key[0]),
),
x=self._backend._bn_to_int(priv_key[0]),
)
def exchange(self, peer_public_key: dh.DHPublicKey) -> bytes:
if not isinstance(peer_public_key, _DHPublicKey):
raise TypeError("peer_public_key must be a DHPublicKey")
ctx = self._backend._lib.EVP_PKEY_CTX_new(
self._evp_pkey, self._backend._ffi.NULL
)
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
ctx = self._backend._ffi.gc(ctx, self._backend._lib.EVP_PKEY_CTX_free)
res = self._backend._lib.EVP_PKEY_derive_init(ctx)
self._backend.openssl_assert(res == 1)
res = self._backend._lib.EVP_PKEY_derive_set_peer(
ctx, peer_public_key._evp_pkey
)
# Invalid kex errors here in OpenSSL 3.0 because checks were moved
# to EVP_PKEY_derive_set_peer
self._exchange_assert(res == 1)
keylen = self._backend._ffi.new("size_t *")
res = self._backend._lib.EVP_PKEY_derive(
ctx, self._backend._ffi.NULL, keylen
)
# Invalid kex errors here in OpenSSL < 3
self._exchange_assert(res == 1)
self._backend.openssl_assert(keylen[0] > 0)
buf = self._backend._ffi.new("unsigned char[]", keylen[0])
res = self._backend._lib.EVP_PKEY_derive(ctx, buf, keylen)
self._backend.openssl_assert(res == 1)
key = self._backend._ffi.buffer(buf, keylen[0])[:]
pad = self._key_size_bytes - len(key)
if pad > 0:
key = (b"\x00" * pad) + key
return key
def _exchange_assert(self, ok):
if not ok:
errors_with_text = self._backend._consume_errors_with_text()
raise ValueError(
"Error computing shared key.",
errors_with_text,
)
def public_key(self) -> dh.DHPublicKey:
dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
pub_key = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DH_get0_key(
self._dh_cdata, pub_key, self._backend._ffi.NULL
)
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
res = self._backend._lib.DH_set0_key(
dh_cdata, pub_key_dup, self._backend._ffi.NULL
)
self._backend.openssl_assert(res == 1)
evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
def parameters(self) -> dh.DHParameters:
return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
def private_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PrivateFormat,
encryption_algorithm: serialization.KeySerializationEncryption,
) -> bytes:
if format is not serialization.PrivateFormat.PKCS8:
raise ValueError(
"DH private keys support only PKCS8 serialization"
)
if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
q = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DH_get0_pqg(
self._dh_cdata,
self._backend._ffi.NULL,
q,
self._backend._ffi.NULL,
)
if q[0] != self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"DH X9.42 serialization is not supported",
_Reasons.UNSUPPORTED_SERIALIZATION,
)
return self._backend._private_key_bytes(
encoding,
format,
encryption_algorithm,
self,
self._evp_pkey,
self._dh_cdata,
)
class _DHPublicKey(dh.DHPublicKey):
def __init__(self, backend, dh_cdata, evp_pkey):
self._backend = backend
self._dh_cdata = dh_cdata
self._evp_pkey = evp_pkey
self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
@property
def key_size(self) -> int:
return self._key_size_bits
def public_numbers(self) -> dh.DHPublicNumbers:
p = self._backend._ffi.new("BIGNUM **")
g = self._backend._ffi.new("BIGNUM **")
q = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
if q[0] == self._backend._ffi.NULL:
q_val = None
else:
q_val = self._backend._bn_to_int(q[0])
pub_key = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DH_get0_key(
self._dh_cdata, pub_key, self._backend._ffi.NULL
)
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
return dh.DHPublicNumbers(
parameter_numbers=dh.DHParameterNumbers(
p=self._backend._bn_to_int(p[0]),
g=self._backend._bn_to_int(g[0]),
q=q_val,
),
y=self._backend._bn_to_int(pub_key[0]),
)
def parameters(self) -> dh.DHParameters:
return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
def public_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PublicFormat,
) -> bytes:
if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
raise ValueError(
"DH public keys support only "
"SubjectPublicKeyInfo serialization"
)
if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
q = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DH_get0_pqg(
self._dh_cdata,
self._backend._ffi.NULL,
q,
self._backend._ffi.NULL,
)
if q[0] != self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"DH X9.42 serialization is not supported",
_Reasons.UNSUPPORTED_SERIALIZATION,
)
return self._backend._public_key_bytes(
encoding, format, self, self._evp_pkey, None
)

View File

@@ -0,0 +1,289 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
import typing
from cryptography import utils
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends.openssl.utils import (
_calculate_digest_and_algorithm,
_check_not_prehashed,
_warn_sign_verify_deprecated,
)
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import (
AsymmetricSignatureContext,
AsymmetricVerificationContext,
dsa,
utils as asym_utils,
)
def _dsa_sig_sign(backend, private_key, data):
sig_buf_len = backend._lib.DSA_size(private_key._dsa_cdata)
sig_buf = backend._ffi.new("unsigned char[]", sig_buf_len)
buflen = backend._ffi.new("unsigned int *")
# The first parameter passed to DSA_sign is unused by OpenSSL but
# must be an integer.
res = backend._lib.DSA_sign(
0, data, len(data), sig_buf, buflen, private_key._dsa_cdata
)
backend.openssl_assert(res == 1)
backend.openssl_assert(buflen[0])
return backend._ffi.buffer(sig_buf)[: buflen[0]]
def _dsa_sig_verify(backend, public_key, signature, data):
# The first parameter passed to DSA_verify is unused by OpenSSL but
# must be an integer.
res = backend._lib.DSA_verify(
0, data, len(data), signature, len(signature), public_key._dsa_cdata
)
if res != 1:
backend._consume_errors()
raise InvalidSignature
class _DSAVerificationContext(AsymmetricVerificationContext):
def __init__(self, backend, public_key, signature, algorithm):
self._backend = backend
self._public_key = public_key
self._signature = signature
self._algorithm = algorithm
self._hash_ctx = hashes.Hash(self._algorithm, self._backend)
def update(self, data: bytes):
self._hash_ctx.update(data)
def verify(self) -> None:
data_to_verify = self._hash_ctx.finalize()
_dsa_sig_verify(
self._backend, self._public_key, self._signature, data_to_verify
)
class _DSASignatureContext(AsymmetricSignatureContext):
def __init__(
self,
backend,
private_key: dsa.DSAPrivateKey,
algorithm: hashes.HashAlgorithm,
):
self._backend = backend
self._private_key = private_key
self._algorithm = algorithm
self._hash_ctx = hashes.Hash(self._algorithm, self._backend)
def update(self, data: bytes) -> None:
self._hash_ctx.update(data)
def finalize(self) -> bytes:
data_to_sign = self._hash_ctx.finalize()
return _dsa_sig_sign(self._backend, self._private_key, data_to_sign)
class _DSAParameters(dsa.DSAParameters):
def __init__(self, backend, dsa_cdata):
self._backend = backend
self._dsa_cdata = dsa_cdata
def parameter_numbers(self) -> dsa.DSAParameterNumbers:
p = self._backend._ffi.new("BIGNUM **")
q = self._backend._ffi.new("BIGNUM **")
g = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DSA_get0_pqg(self._dsa_cdata, p, q, g)
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
return dsa.DSAParameterNumbers(
p=self._backend._bn_to_int(p[0]),
q=self._backend._bn_to_int(q[0]),
g=self._backend._bn_to_int(g[0]),
)
def generate_private_key(self) -> dsa.DSAPrivateKey:
return self._backend.generate_dsa_private_key(self)
class _DSAPrivateKey(dsa.DSAPrivateKey):
def __init__(self, backend, dsa_cdata, evp_pkey):
self._backend = backend
self._dsa_cdata = dsa_cdata
self._evp_pkey = evp_pkey
p = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DSA_get0_pqg(
dsa_cdata, p, self._backend._ffi.NULL, self._backend._ffi.NULL
)
self._backend.openssl_assert(p[0] != backend._ffi.NULL)
self._key_size = self._backend._lib.BN_num_bits(p[0])
key_size = utils.read_only_property("_key_size")
def signer(
self, signature_algorithm: hashes.HashAlgorithm
) -> AsymmetricSignatureContext:
_warn_sign_verify_deprecated()
_check_not_prehashed(signature_algorithm)
return _DSASignatureContext(self._backend, self, signature_algorithm)
def private_numbers(self) -> dsa.DSAPrivateNumbers:
p = self._backend._ffi.new("BIGNUM **")
q = self._backend._ffi.new("BIGNUM **")
g = self._backend._ffi.new("BIGNUM **")
pub_key = self._backend._ffi.new("BIGNUM **")
priv_key = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DSA_get0_pqg(self._dsa_cdata, p, q, g)
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
self._backend._lib.DSA_get0_key(self._dsa_cdata, pub_key, priv_key)
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
return dsa.DSAPrivateNumbers(
public_numbers=dsa.DSAPublicNumbers(
parameter_numbers=dsa.DSAParameterNumbers(
p=self._backend._bn_to_int(p[0]),
q=self._backend._bn_to_int(q[0]),
g=self._backend._bn_to_int(g[0]),
),
y=self._backend._bn_to_int(pub_key[0]),
),
x=self._backend._bn_to_int(priv_key[0]),
)
def public_key(self) -> dsa.DSAPublicKey:
dsa_cdata = self._backend._lib.DSAparams_dup(self._dsa_cdata)
self._backend.openssl_assert(dsa_cdata != self._backend._ffi.NULL)
dsa_cdata = self._backend._ffi.gc(
dsa_cdata, self._backend._lib.DSA_free
)
pub_key = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DSA_get0_key(
self._dsa_cdata, pub_key, self._backend._ffi.NULL
)
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
res = self._backend._lib.DSA_set0_key(
dsa_cdata, pub_key_dup, self._backend._ffi.NULL
)
self._backend.openssl_assert(res == 1)
evp_pkey = self._backend._dsa_cdata_to_evp_pkey(dsa_cdata)
return _DSAPublicKey(self._backend, dsa_cdata, evp_pkey)
def parameters(self) -> dsa.DSAParameters:
dsa_cdata = self._backend._lib.DSAparams_dup(self._dsa_cdata)
self._backend.openssl_assert(dsa_cdata != self._backend._ffi.NULL)
dsa_cdata = self._backend._ffi.gc(
dsa_cdata, self._backend._lib.DSA_free
)
return _DSAParameters(self._backend, dsa_cdata)
def private_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PrivateFormat,
encryption_algorithm: serialization.KeySerializationEncryption,
) -> bytes:
return self._backend._private_key_bytes(
encoding,
format,
encryption_algorithm,
self,
self._evp_pkey,
self._dsa_cdata,
)
def sign(
self,
data: bytes,
algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
) -> bytes:
data, algorithm = _calculate_digest_and_algorithm(
self._backend, data, algorithm
)
return _dsa_sig_sign(self._backend, self, data)
class _DSAPublicKey(dsa.DSAPublicKey):
def __init__(self, backend, dsa_cdata, evp_pkey):
self._backend = backend
self._dsa_cdata = dsa_cdata
self._evp_pkey = evp_pkey
p = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DSA_get0_pqg(
dsa_cdata, p, self._backend._ffi.NULL, self._backend._ffi.NULL
)
self._backend.openssl_assert(p[0] != backend._ffi.NULL)
self._key_size = self._backend._lib.BN_num_bits(p[0])
key_size = utils.read_only_property("_key_size")
def verifier(
self,
signature: bytes,
signature_algorithm: hashes.HashAlgorithm,
) -> AsymmetricVerificationContext:
_warn_sign_verify_deprecated()
utils._check_bytes("signature", signature)
_check_not_prehashed(signature_algorithm)
return _DSAVerificationContext(
self._backend, self, signature, signature_algorithm
)
def public_numbers(self) -> dsa.DSAPublicNumbers:
p = self._backend._ffi.new("BIGNUM **")
q = self._backend._ffi.new("BIGNUM **")
g = self._backend._ffi.new("BIGNUM **")
pub_key = self._backend._ffi.new("BIGNUM **")
self._backend._lib.DSA_get0_pqg(self._dsa_cdata, p, q, g)
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
self._backend._lib.DSA_get0_key(
self._dsa_cdata, pub_key, self._backend._ffi.NULL
)
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
return dsa.DSAPublicNumbers(
parameter_numbers=dsa.DSAParameterNumbers(
p=self._backend._bn_to_int(p[0]),
q=self._backend._bn_to_int(q[0]),
g=self._backend._bn_to_int(g[0]),
),
y=self._backend._bn_to_int(pub_key[0]),
)
def parameters(self) -> dsa.DSAParameters:
dsa_cdata = self._backend._lib.DSAparams_dup(self._dsa_cdata)
dsa_cdata = self._backend._ffi.gc(
dsa_cdata, self._backend._lib.DSA_free
)
return _DSAParameters(self._backend, dsa_cdata)
def public_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PublicFormat,
) -> bytes:
return self._backend._public_key_bytes(
encoding, format, self, self._evp_pkey, None
)
def verify(
self,
signature: bytes,
data: bytes,
algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
) -> None:
data, algorithm = _calculate_digest_and_algorithm(
self._backend, data, algorithm
)
return _dsa_sig_verify(self._backend, self, signature, data)

View File

@@ -0,0 +1,367 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography import utils
from cryptography.exceptions import (
InvalidSignature,
UnsupportedAlgorithm,
_Reasons,
)
from cryptography.hazmat.backends.openssl.utils import (
_calculate_digest_and_algorithm,
_check_not_prehashed,
_evp_pkey_derive,
_warn_sign_verify_deprecated,
)
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import (
AsymmetricSignatureContext,
AsymmetricVerificationContext,
ec,
)
def _check_signature_algorithm(
signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
):
if not isinstance(signature_algorithm, ec.ECDSA):
raise UnsupportedAlgorithm(
"Unsupported elliptic curve signature algorithm.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
)
def _ec_key_curve_sn(backend, ec_key):
group = backend._lib.EC_KEY_get0_group(ec_key)
backend.openssl_assert(group != backend._ffi.NULL)
nid = backend._lib.EC_GROUP_get_curve_name(group)
# The following check is to find EC keys with unnamed curves and raise
# an error for now.
if nid == backend._lib.NID_undef:
raise NotImplementedError(
"ECDSA keys with unnamed curves are unsupported at this time"
)
# This is like the above check, but it also catches the case where you
# explicitly encoded a curve with the same parameters as a named curve.
# Don't do that.
if (
not backend._lib.CRYPTOGRAPHY_IS_LIBRESSL
and backend._lib.EC_GROUP_get_asn1_flag(group) == 0
):
raise NotImplementedError(
"ECDSA keys with unnamed curves are unsupported at this time"
)
curve_name = backend._lib.OBJ_nid2sn(nid)
backend.openssl_assert(curve_name != backend._ffi.NULL)
sn = backend._ffi.string(curve_name).decode("ascii")
return sn
def _mark_asn1_named_ec_curve(backend, ec_cdata):
"""
Set the named curve flag on the EC_KEY. This causes OpenSSL to
serialize EC keys along with their curve OID which makes
deserialization easier.
"""
backend._lib.EC_KEY_set_asn1_flag(
ec_cdata, backend._lib.OPENSSL_EC_NAMED_CURVE
)
def _sn_to_elliptic_curve(backend, sn):
try:
return ec._CURVE_TYPES[sn]()
except KeyError:
raise UnsupportedAlgorithm(
"{} is not a supported elliptic curve".format(sn),
_Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
)
def _ecdsa_sig_sign(backend, private_key, data):
max_size = backend._lib.ECDSA_size(private_key._ec_key)
backend.openssl_assert(max_size > 0)
sigbuf = backend._ffi.new("unsigned char[]", max_size)
siglen_ptr = backend._ffi.new("unsigned int[]", 1)
res = backend._lib.ECDSA_sign(
0, data, len(data), sigbuf, siglen_ptr, private_key._ec_key
)
backend.openssl_assert(res == 1)
return backend._ffi.buffer(sigbuf)[: siglen_ptr[0]]
def _ecdsa_sig_verify(backend, public_key, signature, data):
res = backend._lib.ECDSA_verify(
0, data, len(data), signature, len(signature), public_key._ec_key
)
if res != 1:
backend._consume_errors()
raise InvalidSignature
class _ECDSASignatureContext(AsymmetricSignatureContext):
def __init__(
self,
backend,
private_key: ec.EllipticCurvePrivateKey,
algorithm: hashes.HashAlgorithm,
):
self._backend = backend
self._private_key = private_key
self._digest = hashes.Hash(algorithm, backend)
def update(self, data: bytes) -> None:
self._digest.update(data)
def finalize(self) -> bytes:
digest = self._digest.finalize()
return _ecdsa_sig_sign(self._backend, self._private_key, digest)
class _ECDSAVerificationContext(AsymmetricVerificationContext):
def __init__(
self,
backend,
public_key: ec.EllipticCurvePublicKey,
signature: bytes,
algorithm: hashes.HashAlgorithm,
):
self._backend = backend
self._public_key = public_key
self._signature = signature
self._digest = hashes.Hash(algorithm, backend)
def update(self, data: bytes) -> None:
self._digest.update(data)
def verify(self) -> None:
digest = self._digest.finalize()
_ecdsa_sig_verify(
self._backend, self._public_key, self._signature, digest
)
class _EllipticCurvePrivateKey(ec.EllipticCurvePrivateKey):
def __init__(self, backend, ec_key_cdata, evp_pkey):
self._backend = backend
self._ec_key = ec_key_cdata
self._evp_pkey = evp_pkey
sn = _ec_key_curve_sn(backend, ec_key_cdata)
self._curve = _sn_to_elliptic_curve(backend, sn)
_mark_asn1_named_ec_curve(backend, ec_key_cdata)
curve = utils.read_only_property("_curve")
@property
def key_size(self) -> int:
return self.curve.key_size
def signer(
self, signature_algorithm: ec.EllipticCurveSignatureAlgorithm
) -> AsymmetricSignatureContext:
_warn_sign_verify_deprecated()
_check_signature_algorithm(signature_algorithm)
_check_not_prehashed(signature_algorithm.algorithm)
# This assert is to help mypy realize what type this object holds
assert isinstance(signature_algorithm.algorithm, hashes.HashAlgorithm)
return _ECDSASignatureContext(
self._backend, self, signature_algorithm.algorithm
)
def exchange(
self, algorithm: ec.ECDH, peer_public_key: ec.EllipticCurvePublicKey
) -> bytes:
if not (
self._backend.elliptic_curve_exchange_algorithm_supported(
algorithm, self.curve
)
):
raise UnsupportedAlgorithm(
"This backend does not support the ECDH algorithm.",
_Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM,
)
if peer_public_key.curve.name != self.curve.name:
raise ValueError(
"peer_public_key and self are not on the same curve"
)
return _evp_pkey_derive(self._backend, self._evp_pkey, peer_public_key)
def public_key(self) -> ec.EllipticCurvePublicKey:
group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
self._backend.openssl_assert(group != self._backend._ffi.NULL)
curve_nid = self._backend._lib.EC_GROUP_get_curve_name(group)
public_ec_key = self._backend._ec_key_new_by_curve_nid(curve_nid)
point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
self._backend.openssl_assert(point != self._backend._ffi.NULL)
res = self._backend._lib.EC_KEY_set_public_key(public_ec_key, point)
self._backend.openssl_assert(res == 1)
evp_pkey = self._backend._ec_cdata_to_evp_pkey(public_ec_key)
return _EllipticCurvePublicKey(self._backend, public_ec_key, evp_pkey)
def private_numbers(self) -> ec.EllipticCurvePrivateNumbers:
bn = self._backend._lib.EC_KEY_get0_private_key(self._ec_key)
private_value = self._backend._bn_to_int(bn)
return ec.EllipticCurvePrivateNumbers(
private_value=private_value,
public_numbers=self.public_key().public_numbers(),
)
def private_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PrivateFormat,
encryption_algorithm: serialization.KeySerializationEncryption,
) -> bytes:
return self._backend._private_key_bytes(
encoding,
format,
encryption_algorithm,
self,
self._evp_pkey,
self._ec_key,
)
def sign(
self,
data: bytes,
signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
) -> bytes:
_check_signature_algorithm(signature_algorithm)
data, algorithm = _calculate_digest_and_algorithm(
self._backend,
data,
signature_algorithm._algorithm, # type: ignore[attr-defined]
)
return _ecdsa_sig_sign(self._backend, self, data)
class _EllipticCurvePublicKey(ec.EllipticCurvePublicKey):
def __init__(self, backend, ec_key_cdata, evp_pkey):
self._backend = backend
self._ec_key = ec_key_cdata
self._evp_pkey = evp_pkey
sn = _ec_key_curve_sn(backend, ec_key_cdata)
self._curve = _sn_to_elliptic_curve(backend, sn)
_mark_asn1_named_ec_curve(backend, ec_key_cdata)
curve = utils.read_only_property("_curve")
@property
def key_size(self) -> int:
return self.curve.key_size
def verifier(
self,
signature: bytes,
signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
) -> AsymmetricVerificationContext:
_warn_sign_verify_deprecated()
utils._check_bytes("signature", signature)
_check_signature_algorithm(signature_algorithm)
_check_not_prehashed(signature_algorithm.algorithm)
# This assert is to help mypy realize what type this object holds
assert isinstance(signature_algorithm.algorithm, hashes.HashAlgorithm)
return _ECDSAVerificationContext(
self._backend, self, signature, signature_algorithm.algorithm
)
def public_numbers(self) -> ec.EllipticCurvePublicNumbers:
get_func, group = self._backend._ec_key_determine_group_get_func(
self._ec_key
)
point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
self._backend.openssl_assert(point != self._backend._ffi.NULL)
with self._backend._tmp_bn_ctx() as bn_ctx:
bn_x = self._backend._lib.BN_CTX_get(bn_ctx)
bn_y = self._backend._lib.BN_CTX_get(bn_ctx)
res = get_func(group, point, bn_x, bn_y, bn_ctx)
self._backend.openssl_assert(res == 1)
x = self._backend._bn_to_int(bn_x)
y = self._backend._bn_to_int(bn_y)
return ec.EllipticCurvePublicNumbers(x=x, y=y, curve=self._curve)
def _encode_point(self, format: serialization.PublicFormat) -> bytes:
if format is serialization.PublicFormat.CompressedPoint:
conversion = self._backend._lib.POINT_CONVERSION_COMPRESSED
else:
assert format is serialization.PublicFormat.UncompressedPoint
conversion = self._backend._lib.POINT_CONVERSION_UNCOMPRESSED
group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
self._backend.openssl_assert(group != self._backend._ffi.NULL)
point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
self._backend.openssl_assert(point != self._backend._ffi.NULL)
with self._backend._tmp_bn_ctx() as bn_ctx:
buflen = self._backend._lib.EC_POINT_point2oct(
group, point, conversion, self._backend._ffi.NULL, 0, bn_ctx
)
self._backend.openssl_assert(buflen > 0)
buf = self._backend._ffi.new("char[]", buflen)
res = self._backend._lib.EC_POINT_point2oct(
group, point, conversion, buf, buflen, bn_ctx
)
self._backend.openssl_assert(buflen == res)
return self._backend._ffi.buffer(buf)[:]
def public_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PublicFormat,
) -> bytes:
if (
encoding is serialization.Encoding.X962
or format is serialization.PublicFormat.CompressedPoint
or format is serialization.PublicFormat.UncompressedPoint
):
if encoding is not serialization.Encoding.X962 or format not in (
serialization.PublicFormat.CompressedPoint,
serialization.PublicFormat.UncompressedPoint,
):
raise ValueError(
"X962 encoding must be used with CompressedPoint or "
"UncompressedPoint format"
)
return self._encode_point(format)
else:
return self._backend._public_key_bytes(
encoding, format, self, self._evp_pkey, None
)
def verify(
self,
signature: bytes,
data: bytes,
signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
) -> None:
_check_signature_algorithm(signature_algorithm)
data, algorithm = _calculate_digest_and_algorithm(
self._backend,
data,
signature_algorithm._algorithm, # type: ignore[attr-defined]
)
_ecdsa_sig_verify(self._backend, self, signature, data)

View File

@@ -0,0 +1,151 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography import exceptions
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
_ED25519_KEY_SIZE,
_ED25519_SIG_SIZE,
)
class _Ed25519PublicKey(Ed25519PublicKey):
def __init__(self, backend, evp_pkey):
self._backend = backend
self._evp_pkey = evp_pkey
def public_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PublicFormat,
) -> bytes:
if (
encoding is serialization.Encoding.Raw
or format is serialization.PublicFormat.Raw
):
if (
encoding is not serialization.Encoding.Raw
or format is not serialization.PublicFormat.Raw
):
raise ValueError(
"When using Raw both encoding and format must be Raw"
)
return self._raw_public_bytes()
return self._backend._public_key_bytes(
encoding, format, self, self._evp_pkey, None
)
def _raw_public_bytes(self) -> bytes:
buf = self._backend._ffi.new("unsigned char []", _ED25519_KEY_SIZE)
buflen = self._backend._ffi.new("size_t *", _ED25519_KEY_SIZE)
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
self._evp_pkey, buf, buflen
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _ED25519_KEY_SIZE)
return self._backend._ffi.buffer(buf, _ED25519_KEY_SIZE)[:]
def verify(self, signature: bytes, data: bytes) -> None:
evp_md_ctx = self._backend._lib.EVP_MD_CTX_new()
self._backend.openssl_assert(evp_md_ctx != self._backend._ffi.NULL)
evp_md_ctx = self._backend._ffi.gc(
evp_md_ctx, self._backend._lib.EVP_MD_CTX_free
)
res = self._backend._lib.EVP_DigestVerifyInit(
evp_md_ctx,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._evp_pkey,
)
self._backend.openssl_assert(res == 1)
res = self._backend._lib.EVP_DigestVerify(
evp_md_ctx, signature, len(signature), data, len(data)
)
if res != 1:
self._backend._consume_errors()
raise exceptions.InvalidSignature
class _Ed25519PrivateKey(Ed25519PrivateKey):
def __init__(self, backend, evp_pkey):
self._backend = backend
self._evp_pkey = evp_pkey
def public_key(self) -> Ed25519PublicKey:
buf = self._backend._ffi.new("unsigned char []", _ED25519_KEY_SIZE)
buflen = self._backend._ffi.new("size_t *", _ED25519_KEY_SIZE)
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
self._evp_pkey, buf, buflen
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _ED25519_KEY_SIZE)
public_bytes = self._backend._ffi.buffer(buf)[:]
return self._backend.ed25519_load_public_bytes(public_bytes)
def sign(self, data: bytes) -> bytes:
evp_md_ctx = self._backend._lib.EVP_MD_CTX_new()
self._backend.openssl_assert(evp_md_ctx != self._backend._ffi.NULL)
evp_md_ctx = self._backend._ffi.gc(
evp_md_ctx, self._backend._lib.EVP_MD_CTX_free
)
res = self._backend._lib.EVP_DigestSignInit(
evp_md_ctx,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._evp_pkey,
)
self._backend.openssl_assert(res == 1)
buf = self._backend._ffi.new("unsigned char[]", _ED25519_SIG_SIZE)
buflen = self._backend._ffi.new("size_t *", len(buf))
res = self._backend._lib.EVP_DigestSign(
evp_md_ctx, buf, buflen, data, len(data)
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _ED25519_SIG_SIZE)
return self._backend._ffi.buffer(buf, buflen[0])[:]
def private_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PrivateFormat,
encryption_algorithm: serialization.KeySerializationEncryption,
) -> bytes:
if (
encoding is serialization.Encoding.Raw
or format is serialization.PublicFormat.Raw
):
if (
format is not serialization.PrivateFormat.Raw
or encoding is not serialization.Encoding.Raw
or not isinstance(
encryption_algorithm, serialization.NoEncryption
)
):
raise ValueError(
"When using Raw both encoding and format must be Raw "
"and encryption_algorithm must be NoEncryption()"
)
return self._raw_private_bytes()
return self._backend._private_key_bytes(
encoding, format, encryption_algorithm, self, self._evp_pkey, None
)
def _raw_private_bytes(self) -> bytes:
buf = self._backend._ffi.new("unsigned char []", _ED25519_KEY_SIZE)
buflen = self._backend._ffi.new("size_t *", _ED25519_KEY_SIZE)
res = self._backend._lib.EVP_PKEY_get_raw_private_key(
self._evp_pkey, buf, buflen
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _ED25519_KEY_SIZE)
return self._backend._ffi.buffer(buf, _ED25519_KEY_SIZE)[:]

View File

@@ -0,0 +1,152 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography import exceptions
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed448 import (
Ed448PrivateKey,
Ed448PublicKey,
)
_ED448_KEY_SIZE = 57
_ED448_SIG_SIZE = 114
class _Ed448PublicKey(Ed448PublicKey):
def __init__(self, backend, evp_pkey):
self._backend = backend
self._evp_pkey = evp_pkey
def public_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PublicFormat,
) -> bytes:
if (
encoding is serialization.Encoding.Raw
or format is serialization.PublicFormat.Raw
):
if (
encoding is not serialization.Encoding.Raw
or format is not serialization.PublicFormat.Raw
):
raise ValueError(
"When using Raw both encoding and format must be Raw"
)
return self._raw_public_bytes()
return self._backend._public_key_bytes(
encoding, format, self, self._evp_pkey, None
)
def _raw_public_bytes(self) -> bytes:
buf = self._backend._ffi.new("unsigned char []", _ED448_KEY_SIZE)
buflen = self._backend._ffi.new("size_t *", _ED448_KEY_SIZE)
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
self._evp_pkey, buf, buflen
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _ED448_KEY_SIZE)
return self._backend._ffi.buffer(buf, _ED448_KEY_SIZE)[:]
def verify(self, signature: bytes, data: bytes) -> None:
evp_md_ctx = self._backend._lib.EVP_MD_CTX_new()
self._backend.openssl_assert(evp_md_ctx != self._backend._ffi.NULL)
evp_md_ctx = self._backend._ffi.gc(
evp_md_ctx, self._backend._lib.EVP_MD_CTX_free
)
res = self._backend._lib.EVP_DigestVerifyInit(
evp_md_ctx,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._evp_pkey,
)
self._backend.openssl_assert(res == 1)
res = self._backend._lib.EVP_DigestVerify(
evp_md_ctx, signature, len(signature), data, len(data)
)
if res != 1:
self._backend._consume_errors()
raise exceptions.InvalidSignature
class _Ed448PrivateKey(Ed448PrivateKey):
def __init__(self, backend, evp_pkey):
self._backend = backend
self._evp_pkey = evp_pkey
def public_key(self) -> Ed448PublicKey:
buf = self._backend._ffi.new("unsigned char []", _ED448_KEY_SIZE)
buflen = self._backend._ffi.new("size_t *", _ED448_KEY_SIZE)
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
self._evp_pkey, buf, buflen
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _ED448_KEY_SIZE)
public_bytes = self._backend._ffi.buffer(buf)[:]
return self._backend.ed448_load_public_bytes(public_bytes)
def sign(self, data: bytes) -> bytes:
evp_md_ctx = self._backend._lib.EVP_MD_CTX_new()
self._backend.openssl_assert(evp_md_ctx != self._backend._ffi.NULL)
evp_md_ctx = self._backend._ffi.gc(
evp_md_ctx, self._backend._lib.EVP_MD_CTX_free
)
res = self._backend._lib.EVP_DigestSignInit(
evp_md_ctx,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._evp_pkey,
)
self._backend.openssl_assert(res == 1)
buf = self._backend._ffi.new("unsigned char[]", _ED448_SIG_SIZE)
buflen = self._backend._ffi.new("size_t *", len(buf))
res = self._backend._lib.EVP_DigestSign(
evp_md_ctx, buf, buflen, data, len(data)
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _ED448_SIG_SIZE)
return self._backend._ffi.buffer(buf, buflen[0])[:]
def private_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PrivateFormat,
encryption_algorithm: serialization.KeySerializationEncryption,
) -> bytes:
if (
encoding is serialization.Encoding.Raw
or format is serialization.PublicFormat.Raw
):
if (
format is not serialization.PrivateFormat.Raw
or encoding is not serialization.Encoding.Raw
or not isinstance(
encryption_algorithm, serialization.NoEncryption
)
):
raise ValueError(
"When using Raw both encoding and format must be Raw "
"and encryption_algorithm must be NoEncryption()"
)
return self._raw_private_bytes()
return self._backend._private_key_bytes(
encoding, format, encryption_algorithm, self, self._evp_pkey, None
)
def _raw_private_bytes(self) -> bytes:
buf = self._backend._ffi.new("unsigned char []", _ED448_KEY_SIZE)
buflen = self._backend._ffi.new("size_t *", _ED448_KEY_SIZE)
res = self._backend._lib.EVP_PKEY_get_raw_private_key(
self._evp_pkey, buf, buflen
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _ED448_KEY_SIZE)
return self._backend._ffi.buffer(buf, _ED448_KEY_SIZE)[:]

View File

@@ -0,0 +1,654 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
import calendar
import ipaddress
from cryptography import utils, x509
from cryptography.hazmat.backends.openssl.decode_asn1 import (
_CRL_ENTRY_REASON_ENUM_TO_CODE,
_DISTPOINT_TYPE_FULLNAME,
_DISTPOINT_TYPE_RELATIVENAME,
)
from cryptography.x509.name import _ASN1Type
from cryptography.x509.oid import (
CRLEntryExtensionOID,
ExtensionOID,
OCSPExtensionOID,
)
def _encode_asn1_int(backend, x):
"""
Converts a python integer to an ASN1_INTEGER. The returned ASN1_INTEGER
will not be garbage collected (to support adding them to structs that take
ownership of the object). Be sure to register it for GC if it will be
discarded after use.
"""
# Convert Python integer to OpenSSL "bignum" in case value exceeds
# machine's native integer limits (note: `int_to_bn` doesn't automatically
# GC).
i = backend._int_to_bn(x)
i = backend._ffi.gc(i, backend._lib.BN_free)
# Wrap in an ASN.1 integer. Don't GC -- as documented.
i = backend._lib.BN_to_ASN1_INTEGER(i, backend._ffi.NULL)
backend.openssl_assert(i != backend._ffi.NULL)
return i
def _encode_asn1_int_gc(backend, x):
i = _encode_asn1_int(backend, x)
i = backend._ffi.gc(i, backend._lib.ASN1_INTEGER_free)
return i
def _encode_asn1_str(backend, data):
"""
Create an ASN1_OCTET_STRING from a Python byte string.
"""
s = backend._lib.ASN1_OCTET_STRING_new()
res = backend._lib.ASN1_OCTET_STRING_set(s, data, len(data))
backend.openssl_assert(res == 1)
return s
def _encode_asn1_utf8_str(backend, string):
"""
Create an ASN1_UTF8STRING from a Python unicode string.
This object will be an ASN1_STRING with UTF8 type in OpenSSL and
can be decoded with ASN1_STRING_to_UTF8.
"""
s = backend._lib.ASN1_UTF8STRING_new()
res = backend._lib.ASN1_STRING_set(
s, string.encode("utf8"), len(string.encode("utf8"))
)
backend.openssl_assert(res == 1)
return s
def _encode_asn1_str_gc(backend, data):
s = _encode_asn1_str(backend, data)
s = backend._ffi.gc(s, backend._lib.ASN1_OCTET_STRING_free)
return s
def _encode_inhibit_any_policy(backend, inhibit_any_policy):
return _encode_asn1_int_gc(backend, inhibit_any_policy.skip_certs)
def _encode_name(backend, name):
"""
The X509_NAME created will not be gc'd. Use _encode_name_gc if needed.
"""
subject = backend._lib.X509_NAME_new()
for rdn in name.rdns:
set_flag = 0 # indicate whether to add to last RDN or create new RDN
for attribute in rdn:
name_entry = _encode_name_entry(backend, attribute)
# X509_NAME_add_entry dups the object so we need to gc this copy
name_entry = backend._ffi.gc(
name_entry, backend._lib.X509_NAME_ENTRY_free
)
res = backend._lib.X509_NAME_add_entry(
subject, name_entry, -1, set_flag
)
backend.openssl_assert(res == 1)
set_flag = -1
return subject
def _encode_name_gc(backend, attributes):
subject = _encode_name(backend, attributes)
subject = backend._ffi.gc(subject, backend._lib.X509_NAME_free)
return subject
def _encode_sk_name_entry(backend, attributes):
"""
The sk_X509_NAME_ENTRY created will not be gc'd.
"""
stack = backend._lib.sk_X509_NAME_ENTRY_new_null()
for attribute in attributes:
name_entry = _encode_name_entry(backend, attribute)
res = backend._lib.sk_X509_NAME_ENTRY_push(stack, name_entry)
backend.openssl_assert(res >= 1)
return stack
def _encode_name_entry(backend, attribute):
if attribute._type is _ASN1Type.BMPString:
value = attribute.value.encode("utf_16_be")
elif attribute._type is _ASN1Type.UniversalString:
value = attribute.value.encode("utf_32_be")
else:
value = attribute.value.encode("utf8")
obj = _txt2obj_gc(backend, attribute.oid.dotted_string)
name_entry = backend._lib.X509_NAME_ENTRY_create_by_OBJ(
backend._ffi.NULL, obj, attribute._type.value, value, len(value)
)
return name_entry
def _encode_crl_number_delta_crl_indicator(backend, ext):
return _encode_asn1_int_gc(backend, ext.crl_number)
def _encode_issuing_dist_point(backend, ext):
idp = backend._lib.ISSUING_DIST_POINT_new()
backend.openssl_assert(idp != backend._ffi.NULL)
idp = backend._ffi.gc(idp, backend._lib.ISSUING_DIST_POINT_free)
idp.onlyuser = 255 if ext.only_contains_user_certs else 0
idp.onlyCA = 255 if ext.only_contains_ca_certs else 0
idp.indirectCRL = 255 if ext.indirect_crl else 0
idp.onlyattr = 255 if ext.only_contains_attribute_certs else 0
if ext.only_some_reasons:
idp.onlysomereasons = _encode_reasonflags(
backend, ext.only_some_reasons
)
if ext.full_name:
idp.distpoint = _encode_full_name(backend, ext.full_name)
if ext.relative_name:
idp.distpoint = _encode_relative_name(backend, ext.relative_name)
return idp
def _encode_crl_reason(backend, crl_reason):
asn1enum = backend._lib.ASN1_ENUMERATED_new()
backend.openssl_assert(asn1enum != backend._ffi.NULL)
asn1enum = backend._ffi.gc(asn1enum, backend._lib.ASN1_ENUMERATED_free)
res = backend._lib.ASN1_ENUMERATED_set(
asn1enum, _CRL_ENTRY_REASON_ENUM_TO_CODE[crl_reason.reason]
)
backend.openssl_assert(res == 1)
return asn1enum
def _encode_invalidity_date(backend, invalidity_date):
time = backend._lib.ASN1_GENERALIZEDTIME_set(
backend._ffi.NULL,
calendar.timegm(invalidity_date.invalidity_date.timetuple()),
)
backend.openssl_assert(time != backend._ffi.NULL)
time = backend._ffi.gc(time, backend._lib.ASN1_GENERALIZEDTIME_free)
return time
def _encode_certificate_policies(backend, certificate_policies):
cp = backend._lib.sk_POLICYINFO_new_null()
backend.openssl_assert(cp != backend._ffi.NULL)
cp = backend._ffi.gc(cp, backend._lib.sk_POLICYINFO_free)
for policy_info in certificate_policies:
pi = backend._lib.POLICYINFO_new()
backend.openssl_assert(pi != backend._ffi.NULL)
res = backend._lib.sk_POLICYINFO_push(cp, pi)
backend.openssl_assert(res >= 1)
oid = _txt2obj(backend, policy_info.policy_identifier.dotted_string)
pi.policyid = oid
if policy_info.policy_qualifiers:
pqis = backend._lib.sk_POLICYQUALINFO_new_null()
backend.openssl_assert(pqis != backend._ffi.NULL)
for qualifier in policy_info.policy_qualifiers:
pqi = backend._lib.POLICYQUALINFO_new()
backend.openssl_assert(pqi != backend._ffi.NULL)
res = backend._lib.sk_POLICYQUALINFO_push(pqis, pqi)
backend.openssl_assert(res >= 1)
if isinstance(qualifier, str):
pqi.pqualid = _txt2obj(
backend, x509.OID_CPS_QUALIFIER.dotted_string
)
pqi.d.cpsuri = _encode_asn1_str(
backend,
qualifier.encode("ascii"),
)
else:
assert isinstance(qualifier, x509.UserNotice)
pqi.pqualid = _txt2obj(
backend, x509.OID_CPS_USER_NOTICE.dotted_string
)
un = backend._lib.USERNOTICE_new()
backend.openssl_assert(un != backend._ffi.NULL)
pqi.d.usernotice = un
if qualifier.explicit_text:
un.exptext = _encode_asn1_utf8_str(
backend, qualifier.explicit_text
)
un.noticeref = _encode_notice_reference(
backend, qualifier.notice_reference
)
pi.qualifiers = pqis
return cp
def _encode_notice_reference(backend, notice):
if notice is None:
return backend._ffi.NULL
else:
nr = backend._lib.NOTICEREF_new()
backend.openssl_assert(nr != backend._ffi.NULL)
# organization is a required field
nr.organization = _encode_asn1_utf8_str(backend, notice.organization)
notice_stack = backend._lib.sk_ASN1_INTEGER_new_null()
nr.noticenos = notice_stack
for number in notice.notice_numbers:
num = _encode_asn1_int(backend, number)
res = backend._lib.sk_ASN1_INTEGER_push(notice_stack, num)
backend.openssl_assert(res >= 1)
return nr
def _txt2obj(backend, name):
"""
Converts a Python string with an ASN.1 object ID in dotted form to a
ASN1_OBJECT.
"""
name = name.encode("ascii")
obj = backend._lib.OBJ_txt2obj(name, 1)
backend.openssl_assert(obj != backend._ffi.NULL)
return obj
def _txt2obj_gc(backend, name):
obj = _txt2obj(backend, name)
obj = backend._ffi.gc(obj, backend._lib.ASN1_OBJECT_free)
return obj
def _encode_ocsp_nocheck(backend, ext):
# Doesn't need to be GC'd
return backend._lib.ASN1_NULL_new()
def _encode_key_usage(backend, key_usage):
set_bit = backend._lib.ASN1_BIT_STRING_set_bit
ku = backend._lib.ASN1_BIT_STRING_new()
ku = backend._ffi.gc(ku, backend._lib.ASN1_BIT_STRING_free)
res = set_bit(ku, 0, key_usage.digital_signature)
backend.openssl_assert(res == 1)
res = set_bit(ku, 1, key_usage.content_commitment)
backend.openssl_assert(res == 1)
res = set_bit(ku, 2, key_usage.key_encipherment)
backend.openssl_assert(res == 1)
res = set_bit(ku, 3, key_usage.data_encipherment)
backend.openssl_assert(res == 1)
res = set_bit(ku, 4, key_usage.key_agreement)
backend.openssl_assert(res == 1)
res = set_bit(ku, 5, key_usage.key_cert_sign)
backend.openssl_assert(res == 1)
res = set_bit(ku, 6, key_usage.crl_sign)
backend.openssl_assert(res == 1)
if key_usage.key_agreement:
res = set_bit(ku, 7, key_usage.encipher_only)
backend.openssl_assert(res == 1)
res = set_bit(ku, 8, key_usage.decipher_only)
backend.openssl_assert(res == 1)
else:
res = set_bit(ku, 7, 0)
backend.openssl_assert(res == 1)
res = set_bit(ku, 8, 0)
backend.openssl_assert(res == 1)
return ku
def _encode_authority_key_identifier(backend, authority_keyid):
akid = backend._lib.AUTHORITY_KEYID_new()
backend.openssl_assert(akid != backend._ffi.NULL)
akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free)
if authority_keyid.key_identifier is not None:
akid.keyid = _encode_asn1_str(
backend,
authority_keyid.key_identifier,
)
if authority_keyid.authority_cert_issuer is not None:
akid.issuer = _encode_general_names(
backend, authority_keyid.authority_cert_issuer
)
if authority_keyid.authority_cert_serial_number is not None:
akid.serial = _encode_asn1_int(
backend, authority_keyid.authority_cert_serial_number
)
return akid
def _encode_basic_constraints(backend, basic_constraints):
constraints = backend._lib.BASIC_CONSTRAINTS_new()
constraints = backend._ffi.gc(
constraints, backend._lib.BASIC_CONSTRAINTS_free
)
constraints.ca = 255 if basic_constraints.ca else 0
if basic_constraints.ca and basic_constraints.path_length is not None:
constraints.pathlen = _encode_asn1_int(
backend, basic_constraints.path_length
)
return constraints
def _encode_information_access(backend, info_access):
aia = backend._lib.sk_ACCESS_DESCRIPTION_new_null()
backend.openssl_assert(aia != backend._ffi.NULL)
aia = backend._ffi.gc(
aia,
lambda x: backend._lib.sk_ACCESS_DESCRIPTION_pop_free(
x,
backend._ffi.addressof(
backend._lib._original_lib, "ACCESS_DESCRIPTION_free"
),
),
)
for access_description in info_access:
ad = backend._lib.ACCESS_DESCRIPTION_new()
method = _txt2obj(
backend, access_description.access_method.dotted_string
)
_encode_general_name_preallocated(
backend, access_description.access_location, ad.location
)
ad.method = method
res = backend._lib.sk_ACCESS_DESCRIPTION_push(aia, ad)
backend.openssl_assert(res >= 1)
return aia
def _encode_general_names(backend, names):
general_names = backend._lib.GENERAL_NAMES_new()
backend.openssl_assert(general_names != backend._ffi.NULL)
for name in names:
gn = _encode_general_name(backend, name)
res = backend._lib.sk_GENERAL_NAME_push(general_names, gn)
backend.openssl_assert(res != 0)
return general_names
def _encode_alt_name(backend, san):
general_names = _encode_general_names(backend, san)
general_names = backend._ffi.gc(
general_names, backend._lib.GENERAL_NAMES_free
)
return general_names
def _encode_subject_key_identifier(backend, ski):
return _encode_asn1_str_gc(backend, ski.digest)
def _encode_general_name(backend, name):
gn = backend._lib.GENERAL_NAME_new()
_encode_general_name_preallocated(backend, name, gn)
return gn
def _encode_general_name_preallocated(backend, name, gn):
if isinstance(name, x509.DNSName):
backend.openssl_assert(gn != backend._ffi.NULL)
gn.type = backend._lib.GEN_DNS
ia5 = backend._lib.ASN1_IA5STRING_new()
backend.openssl_assert(ia5 != backend._ffi.NULL)
# ia5strings are supposed to be ITU T.50 but to allow round-tripping
# of broken certs that encode utf8 we'll encode utf8 here too.
value = name.value.encode("utf8")
res = backend._lib.ASN1_STRING_set(ia5, value, len(value))
backend.openssl_assert(res == 1)
gn.d.dNSName = ia5
elif isinstance(name, x509.RegisteredID):
backend.openssl_assert(gn != backend._ffi.NULL)
gn.type = backend._lib.GEN_RID
obj = backend._lib.OBJ_txt2obj(
name.value.dotted_string.encode("ascii"), 1
)
backend.openssl_assert(obj != backend._ffi.NULL)
gn.d.registeredID = obj
elif isinstance(name, x509.DirectoryName):
backend.openssl_assert(gn != backend._ffi.NULL)
dir_name = _encode_name(backend, name.value)
gn.type = backend._lib.GEN_DIRNAME
gn.d.directoryName = dir_name
elif isinstance(name, x509.IPAddress):
backend.openssl_assert(gn != backend._ffi.NULL)
if isinstance(name.value, ipaddress.IPv4Network):
packed = name.value.network_address.packed + utils.int_to_bytes(
((1 << 32) - name.value.num_addresses), 4
)
elif isinstance(name.value, ipaddress.IPv6Network):
packed = name.value.network_address.packed + utils.int_to_bytes(
(1 << 128) - name.value.num_addresses, 16
)
else:
packed = name.value.packed
ipaddr = _encode_asn1_str(backend, packed)
gn.type = backend._lib.GEN_IPADD
gn.d.iPAddress = ipaddr
elif isinstance(name, x509.OtherName):
backend.openssl_assert(gn != backend._ffi.NULL)
other_name = backend._lib.OTHERNAME_new()
backend.openssl_assert(other_name != backend._ffi.NULL)
type_id = backend._lib.OBJ_txt2obj(
name.type_id.dotted_string.encode("ascii"), 1
)
backend.openssl_assert(type_id != backend._ffi.NULL)
data = backend._ffi.new("unsigned char[]", name.value)
data_ptr_ptr = backend._ffi.new("unsigned char **")
data_ptr_ptr[0] = data
value = backend._lib.d2i_ASN1_TYPE(
backend._ffi.NULL, data_ptr_ptr, len(name.value)
)
if value == backend._ffi.NULL:
backend._consume_errors()
raise ValueError("Invalid ASN.1 data")
other_name.type_id = type_id
other_name.value = value
gn.type = backend._lib.GEN_OTHERNAME
gn.d.otherName = other_name
elif isinstance(name, x509.RFC822Name):
backend.openssl_assert(gn != backend._ffi.NULL)
# ia5strings are supposed to be ITU T.50 but to allow round-tripping
# of broken certs that encode utf8 we'll encode utf8 here too.
data = name.value.encode("utf8")
asn1_str = _encode_asn1_str(backend, data)
gn.type = backend._lib.GEN_EMAIL
gn.d.rfc822Name = asn1_str
elif isinstance(name, x509.UniformResourceIdentifier):
backend.openssl_assert(gn != backend._ffi.NULL)
# ia5strings are supposed to be ITU T.50 but to allow round-tripping
# of broken certs that encode utf8 we'll encode utf8 here too.
data = name.value.encode("utf8")
asn1_str = _encode_asn1_str(backend, data)
gn.type = backend._lib.GEN_URI
gn.d.uniformResourceIdentifier = asn1_str
else:
raise ValueError("{} is an unknown GeneralName type".format(name))
def _encode_extended_key_usage(backend, extended_key_usage):
eku = backend._lib.sk_ASN1_OBJECT_new_null()
eku = backend._ffi.gc(eku, backend._lib.sk_ASN1_OBJECT_free)
for oid in extended_key_usage:
obj = _txt2obj(backend, oid.dotted_string)
res = backend._lib.sk_ASN1_OBJECT_push(eku, obj)
backend.openssl_assert(res >= 1)
return eku
_CRLREASONFLAGS = {
x509.ReasonFlags.key_compromise: 1,
x509.ReasonFlags.ca_compromise: 2,
x509.ReasonFlags.affiliation_changed: 3,
x509.ReasonFlags.superseded: 4,
x509.ReasonFlags.cessation_of_operation: 5,
x509.ReasonFlags.certificate_hold: 6,
x509.ReasonFlags.privilege_withdrawn: 7,
x509.ReasonFlags.aa_compromise: 8,
}
def _encode_reasonflags(backend, reasons):
bitmask = backend._lib.ASN1_BIT_STRING_new()
backend.openssl_assert(bitmask != backend._ffi.NULL)
for reason in reasons:
res = backend._lib.ASN1_BIT_STRING_set_bit(
bitmask, _CRLREASONFLAGS[reason], 1
)
backend.openssl_assert(res == 1)
return bitmask
def _encode_full_name(backend, full_name):
dpn = backend._lib.DIST_POINT_NAME_new()
backend.openssl_assert(dpn != backend._ffi.NULL)
dpn.type = _DISTPOINT_TYPE_FULLNAME
dpn.name.fullname = _encode_general_names(backend, full_name)
return dpn
def _encode_relative_name(backend, relative_name):
dpn = backend._lib.DIST_POINT_NAME_new()
backend.openssl_assert(dpn != backend._ffi.NULL)
dpn.type = _DISTPOINT_TYPE_RELATIVENAME
dpn.name.relativename = _encode_sk_name_entry(backend, relative_name)
return dpn
def _encode_cdps_freshest_crl(backend, cdps):
cdp = backend._lib.sk_DIST_POINT_new_null()
cdp = backend._ffi.gc(cdp, backend._lib.sk_DIST_POINT_free)
for point in cdps:
dp = backend._lib.DIST_POINT_new()
backend.openssl_assert(dp != backend._ffi.NULL)
if point.reasons:
dp.reasons = _encode_reasonflags(backend, point.reasons)
if point.full_name:
dp.distpoint = _encode_full_name(backend, point.full_name)
if point.relative_name:
dp.distpoint = _encode_relative_name(backend, point.relative_name)
if point.crl_issuer:
dp.CRLissuer = _encode_general_names(backend, point.crl_issuer)
res = backend._lib.sk_DIST_POINT_push(cdp, dp)
backend.openssl_assert(res >= 1)
return cdp
def _encode_name_constraints(backend, name_constraints):
nc = backend._lib.NAME_CONSTRAINTS_new()
backend.openssl_assert(nc != backend._ffi.NULL)
nc = backend._ffi.gc(nc, backend._lib.NAME_CONSTRAINTS_free)
permitted = _encode_general_subtree(
backend, name_constraints.permitted_subtrees
)
nc.permittedSubtrees = permitted
excluded = _encode_general_subtree(
backend, name_constraints.excluded_subtrees
)
nc.excludedSubtrees = excluded
return nc
def _encode_policy_constraints(backend, policy_constraints):
pc = backend._lib.POLICY_CONSTRAINTS_new()
backend.openssl_assert(pc != backend._ffi.NULL)
pc = backend._ffi.gc(pc, backend._lib.POLICY_CONSTRAINTS_free)
if policy_constraints.require_explicit_policy is not None:
pc.requireExplicitPolicy = _encode_asn1_int(
backend, policy_constraints.require_explicit_policy
)
if policy_constraints.inhibit_policy_mapping is not None:
pc.inhibitPolicyMapping = _encode_asn1_int(
backend, policy_constraints.inhibit_policy_mapping
)
return pc
def _encode_general_subtree(backend, subtrees):
if subtrees is None:
return backend._ffi.NULL
else:
general_subtrees = backend._lib.sk_GENERAL_SUBTREE_new_null()
for name in subtrees:
gs = backend._lib.GENERAL_SUBTREE_new()
gs.base = _encode_general_name(backend, name)
res = backend._lib.sk_GENERAL_SUBTREE_push(general_subtrees, gs)
backend.openssl_assert(res >= 1)
return general_subtrees
def _encode_nonce(backend, nonce):
return _encode_asn1_str_gc(backend, nonce.nonce)
_EXTENSION_ENCODE_HANDLERS = {
ExtensionOID.BASIC_CONSTRAINTS: _encode_basic_constraints,
ExtensionOID.SUBJECT_KEY_IDENTIFIER: _encode_subject_key_identifier,
ExtensionOID.KEY_USAGE: _encode_key_usage,
ExtensionOID.SUBJECT_ALTERNATIVE_NAME: _encode_alt_name,
ExtensionOID.ISSUER_ALTERNATIVE_NAME: _encode_alt_name,
ExtensionOID.EXTENDED_KEY_USAGE: _encode_extended_key_usage,
ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _encode_authority_key_identifier,
ExtensionOID.CERTIFICATE_POLICIES: _encode_certificate_policies,
ExtensionOID.AUTHORITY_INFORMATION_ACCESS: _encode_information_access,
ExtensionOID.SUBJECT_INFORMATION_ACCESS: _encode_information_access,
ExtensionOID.CRL_DISTRIBUTION_POINTS: _encode_cdps_freshest_crl,
ExtensionOID.FRESHEST_CRL: _encode_cdps_freshest_crl,
ExtensionOID.INHIBIT_ANY_POLICY: _encode_inhibit_any_policy,
ExtensionOID.OCSP_NO_CHECK: _encode_ocsp_nocheck,
ExtensionOID.NAME_CONSTRAINTS: _encode_name_constraints,
ExtensionOID.POLICY_CONSTRAINTS: _encode_policy_constraints,
}
_CRL_EXTENSION_ENCODE_HANDLERS = {
ExtensionOID.ISSUER_ALTERNATIVE_NAME: _encode_alt_name,
ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _encode_authority_key_identifier,
ExtensionOID.AUTHORITY_INFORMATION_ACCESS: _encode_information_access,
ExtensionOID.CRL_NUMBER: _encode_crl_number_delta_crl_indicator,
ExtensionOID.DELTA_CRL_INDICATOR: _encode_crl_number_delta_crl_indicator,
ExtensionOID.ISSUING_DISTRIBUTION_POINT: _encode_issuing_dist_point,
ExtensionOID.FRESHEST_CRL: _encode_cdps_freshest_crl,
}
_CRL_ENTRY_EXTENSION_ENCODE_HANDLERS = {
CRLEntryExtensionOID.CERTIFICATE_ISSUER: _encode_alt_name,
CRLEntryExtensionOID.CRL_REASON: _encode_crl_reason,
CRLEntryExtensionOID.INVALIDITY_DATE: _encode_invalidity_date,
}
_OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS = {
OCSPExtensionOID.NONCE: _encode_nonce,
}
_OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS = {
OCSPExtensionOID.NONCE: _encode_nonce,
}

View File

@@ -0,0 +1,80 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.primitives import hashes
class _HashContext(hashes.HashContext):
def __init__(self, backend, algorithm: hashes.HashAlgorithm, ctx=None):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
ctx = self._backend._lib.EVP_MD_CTX_new()
ctx = self._backend._ffi.gc(
ctx, self._backend._lib.EVP_MD_CTX_free
)
evp_md = self._backend._evp_md_from_algorithm(algorithm)
if evp_md == self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"{} is not a supported hash on this backend.".format(
algorithm.name
),
_Reasons.UNSUPPORTED_HASH,
)
res = self._backend._lib.EVP_DigestInit_ex(
ctx, evp_md, self._backend._ffi.NULL
)
self._backend.openssl_assert(res != 0)
self._ctx = ctx
@property
def algorithm(self) -> hashes.HashAlgorithm:
return self._algorithm
def copy(self) -> "_HashContext":
copied_ctx = self._backend._lib.EVP_MD_CTX_new()
copied_ctx = self._backend._ffi.gc(
copied_ctx, self._backend._lib.EVP_MD_CTX_free
)
res = self._backend._lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx)
self._backend.openssl_assert(res != 0)
return _HashContext(self._backend, self.algorithm, ctx=copied_ctx)
def update(self, data: bytes) -> None:
data_ptr = self._backend._ffi.from_buffer(data)
res = self._backend._lib.EVP_DigestUpdate(
self._ctx, data_ptr, len(data)
)
self._backend.openssl_assert(res != 0)
def finalize(self) -> bytes:
if isinstance(self.algorithm, hashes.ExtendableOutputFunction):
# extendable output functions use a different finalize
return self._finalize_xof()
else:
buf = self._backend._ffi.new(
"unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE
)
outlen = self._backend._ffi.new("unsigned int *")
res = self._backend._lib.EVP_DigestFinal_ex(self._ctx, buf, outlen)
self._backend.openssl_assert(res != 0)
self._backend.openssl_assert(
outlen[0] == self.algorithm.digest_size
)
return self._backend._ffi.buffer(buf)[: outlen[0]]
def _finalize_xof(self) -> bytes:
buf = self._backend._ffi.new(
"unsigned char[]", self.algorithm.digest_size
)
res = self._backend._lib.EVP_DigestFinalXOF(
self._ctx, buf, self.algorithm.digest_size
)
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[: self.algorithm.digest_size]

View File

@@ -0,0 +1,76 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography.exceptions import (
InvalidSignature,
UnsupportedAlgorithm,
_Reasons,
)
from cryptography.hazmat.primitives import constant_time, hashes
class _HMACContext(hashes.HashContext):
def __init__(
self, backend, key: bytes, algorithm: hashes.HashAlgorithm, ctx=None
):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
ctx = self._backend._lib.HMAC_CTX_new()
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
ctx = self._backend._ffi.gc(ctx, self._backend._lib.HMAC_CTX_free)
evp_md = self._backend._evp_md_from_algorithm(algorithm)
if evp_md == self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"{} is not a supported hash on this backend".format(
algorithm.name
),
_Reasons.UNSUPPORTED_HASH,
)
key_ptr = self._backend._ffi.from_buffer(key)
res = self._backend._lib.HMAC_Init_ex(
ctx, key_ptr, len(key), evp_md, self._backend._ffi.NULL
)
self._backend.openssl_assert(res != 0)
self._ctx = ctx
self._key = key
@property
def algorithm(self) -> hashes.HashAlgorithm:
return self._algorithm
def copy(self) -> "_HMACContext":
copied_ctx = self._backend._lib.HMAC_CTX_new()
self._backend.openssl_assert(copied_ctx != self._backend._ffi.NULL)
copied_ctx = self._backend._ffi.gc(
copied_ctx, self._backend._lib.HMAC_CTX_free
)
res = self._backend._lib.HMAC_CTX_copy(copied_ctx, self._ctx)
self._backend.openssl_assert(res != 0)
return _HMACContext(
self._backend, self._key, self.algorithm, ctx=copied_ctx
)
def update(self, data: bytes) -> None:
data_ptr = self._backend._ffi.from_buffer(data)
res = self._backend._lib.HMAC_Update(self._ctx, data_ptr, len(data))
self._backend.openssl_assert(res != 0)
def finalize(self) -> bytes:
buf = self._backend._ffi.new(
"unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE
)
outlen = self._backend._ffi.new("unsigned int *")
res = self._backend._lib.HMAC_Final(self._ctx, buf, outlen)
self._backend.openssl_assert(res != 0)
self._backend.openssl_assert(outlen[0] == self.algorithm.digest_size)
return self._backend._ffi.buffer(buf)[: outlen[0]]
def verify(self, signature: bytes) -> None:
digest = self.finalize()
if not constant_time.bytes_eq(digest, signature):
raise InvalidSignature("Signature did not match digest.")

View File

@@ -0,0 +1,63 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import constant_time
_POLY1305_TAG_SIZE = 16
_POLY1305_KEY_SIZE = 32
class _Poly1305Context(object):
def __init__(self, backend, key):
self._backend = backend
key_ptr = self._backend._ffi.from_buffer(key)
# This function copies the key into OpenSSL-owned memory so we don't
# need to retain it ourselves
evp_pkey = self._backend._lib.EVP_PKEY_new_raw_private_key(
self._backend._lib.NID_poly1305,
self._backend._ffi.NULL,
key_ptr,
len(key),
)
self._backend.openssl_assert(evp_pkey != self._backend._ffi.NULL)
self._evp_pkey = self._backend._ffi.gc(
evp_pkey, self._backend._lib.EVP_PKEY_free
)
ctx = self._backend._lib.EVP_MD_CTX_new()
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
self._ctx = self._backend._ffi.gc(
ctx, self._backend._lib.EVP_MD_CTX_free
)
res = self._backend._lib.EVP_DigestSignInit(
self._ctx,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
self._evp_pkey,
)
self._backend.openssl_assert(res == 1)
def update(self, data):
data_ptr = self._backend._ffi.from_buffer(data)
res = self._backend._lib.EVP_DigestSignUpdate(
self._ctx, data_ptr, len(data)
)
self._backend.openssl_assert(res != 0)
def finalize(self):
buf = self._backend._ffi.new("unsigned char[]", _POLY1305_TAG_SIZE)
outlen = self._backend._ffi.new("size_t *")
res = self._backend._lib.EVP_DigestSignFinal(self._ctx, buf, outlen)
self._backend.openssl_assert(res != 0)
self._backend.openssl_assert(outlen[0] == _POLY1305_TAG_SIZE)
return self._backend._ffi.buffer(buf)[: outlen[0]]
def verify(self, tag):
mac = self.finalize()
if not constant_time.bytes_eq(mac, tag):
raise InvalidSignature("Value did not match computed tag.")

View File

@@ -0,0 +1,581 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
import typing
from cryptography import utils
from cryptography.exceptions import (
InvalidSignature,
UnsupportedAlgorithm,
_Reasons,
)
from cryptography.hazmat.backends.openssl.utils import (
_calculate_digest_and_algorithm,
_check_not_prehashed,
_warn_sign_verify_deprecated,
)
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import (
AsymmetricSignatureContext,
AsymmetricVerificationContext,
utils as asym_utils,
)
from cryptography.hazmat.primitives.asymmetric.padding import (
AsymmetricPadding,
MGF1,
OAEP,
PKCS1v15,
PSS,
calculate_max_pss_salt_length,
)
from cryptography.hazmat.primitives.asymmetric.rsa import (
RSAPrivateKey,
RSAPrivateNumbers,
RSAPublicKey,
RSAPublicNumbers,
)
def _get_rsa_pss_salt_length(
pss: PSS,
key: typing.Union[RSAPrivateKey, RSAPublicKey],
hash_algorithm: hashes.HashAlgorithm,
) -> int:
salt = pss._salt_length
if salt is MGF1.MAX_LENGTH or salt is PSS.MAX_LENGTH:
return calculate_max_pss_salt_length(key, hash_algorithm)
else:
return salt
def _enc_dec_rsa(
backend,
key: typing.Union["_RSAPrivateKey", "_RSAPublicKey"],
data: bytes,
padding: AsymmetricPadding,
) -> bytes:
if not isinstance(padding, AsymmetricPadding):
raise TypeError("Padding must be an instance of AsymmetricPadding.")
if isinstance(padding, PKCS1v15):
padding_enum = backend._lib.RSA_PKCS1_PADDING
elif isinstance(padding, OAEP):
padding_enum = backend._lib.RSA_PKCS1_OAEP_PADDING
if not isinstance(padding._mgf, MGF1):
raise UnsupportedAlgorithm(
"Only MGF1 is supported by this backend.",
_Reasons.UNSUPPORTED_MGF,
)
if not backend.rsa_padding_supported(padding):
raise UnsupportedAlgorithm(
"This combination of padding and hash algorithm is not "
"supported by this backend.",
_Reasons.UNSUPPORTED_PADDING,
)
else:
raise UnsupportedAlgorithm(
"{} is not supported by this backend.".format(padding.name),
_Reasons.UNSUPPORTED_PADDING,
)
return _enc_dec_rsa_pkey_ctx(backend, key, data, padding_enum, padding)
def _enc_dec_rsa_pkey_ctx(
backend,
key: typing.Union["_RSAPrivateKey", "_RSAPublicKey"],
data: bytes,
padding_enum: int,
padding: AsymmetricPadding,
) -> bytes:
if isinstance(key, _RSAPublicKey):
init = backend._lib.EVP_PKEY_encrypt_init
crypt = backend._lib.EVP_PKEY_encrypt
else:
init = backend._lib.EVP_PKEY_decrypt_init
crypt = backend._lib.EVP_PKEY_decrypt
pkey_ctx = backend._lib.EVP_PKEY_CTX_new(key._evp_pkey, backend._ffi.NULL)
backend.openssl_assert(pkey_ctx != backend._ffi.NULL)
pkey_ctx = backend._ffi.gc(pkey_ctx, backend._lib.EVP_PKEY_CTX_free)
res = init(pkey_ctx)
backend.openssl_assert(res == 1)
res = backend._lib.EVP_PKEY_CTX_set_rsa_padding(pkey_ctx, padding_enum)
backend.openssl_assert(res > 0)
buf_size = backend._lib.EVP_PKEY_size(key._evp_pkey)
backend.openssl_assert(buf_size > 0)
if isinstance(padding, OAEP) and backend._lib.Cryptography_HAS_RSA_OAEP_MD:
mgf1_md = backend._evp_md_non_null_from_algorithm(
padding._mgf._algorithm
)
res = backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(pkey_ctx, mgf1_md)
backend.openssl_assert(res > 0)
oaep_md = backend._evp_md_non_null_from_algorithm(padding._algorithm)
res = backend._lib.EVP_PKEY_CTX_set_rsa_oaep_md(pkey_ctx, oaep_md)
backend.openssl_assert(res > 0)
if (
isinstance(padding, OAEP)
and padding._label is not None
and len(padding._label) > 0
):
# set0_rsa_oaep_label takes ownership of the char * so we need to
# copy it into some new memory
labelptr = backend._lib.OPENSSL_malloc(len(padding._label))
backend.openssl_assert(labelptr != backend._ffi.NULL)
backend._ffi.memmove(labelptr, padding._label, len(padding._label))
res = backend._lib.EVP_PKEY_CTX_set0_rsa_oaep_label(
pkey_ctx, labelptr, len(padding._label)
)
backend.openssl_assert(res == 1)
outlen = backend._ffi.new("size_t *", buf_size)
buf = backend._ffi.new("unsigned char[]", buf_size)
# Everything from this line onwards is written with the goal of being as
# constant-time as is practical given the constraints of Python and our
# API. See Bleichenbacher's '98 attack on RSA, and its many many variants.
# As such, you should not attempt to change this (particularly to "clean it
# up") without understanding why it was written this way (see
# Chesterton's Fence), and without measuring to verify you have not
# introduced observable time differences.
res = crypt(pkey_ctx, buf, outlen, data, len(data))
resbuf = backend._ffi.buffer(buf)[: outlen[0]]
backend._lib.ERR_clear_error()
if res <= 0:
raise ValueError("Encryption/decryption failed.")
return resbuf
def _rsa_sig_determine_padding(backend, key, padding, algorithm):
if not isinstance(padding, AsymmetricPadding):
raise TypeError("Expected provider of AsymmetricPadding.")
pkey_size = backend._lib.EVP_PKEY_size(key._evp_pkey)
backend.openssl_assert(pkey_size > 0)
if isinstance(padding, PKCS1v15):
# Hash algorithm is ignored for PKCS1v15-padding, may be None.
padding_enum = backend._lib.RSA_PKCS1_PADDING
elif isinstance(padding, PSS):
if not isinstance(padding._mgf, MGF1):
raise UnsupportedAlgorithm(
"Only MGF1 is supported by this backend.",
_Reasons.UNSUPPORTED_MGF,
)
# PSS padding requires a hash algorithm
if not isinstance(algorithm, hashes.HashAlgorithm):
raise TypeError("Expected instance of hashes.HashAlgorithm.")
# Size of key in bytes - 2 is the maximum
# PSS signature length (salt length is checked later)
if pkey_size - algorithm.digest_size - 2 < 0:
raise ValueError(
"Digest too large for key size. Use a larger "
"key or different digest."
)
padding_enum = backend._lib.RSA_PKCS1_PSS_PADDING
else:
raise UnsupportedAlgorithm(
"{} is not supported by this backend.".format(padding.name),
_Reasons.UNSUPPORTED_PADDING,
)
return padding_enum
# Hash algorithm can be absent (None) to initialize the context without setting
# any message digest algorithm. This is currently only valid for the PKCS1v15
# padding type, where it means that the signature data is encoded/decoded
# as provided, without being wrapped in a DigestInfo structure.
def _rsa_sig_setup(backend, padding, algorithm, key, init_func):
padding_enum = _rsa_sig_determine_padding(backend, key, padding, algorithm)
pkey_ctx = backend._lib.EVP_PKEY_CTX_new(key._evp_pkey, backend._ffi.NULL)
backend.openssl_assert(pkey_ctx != backend._ffi.NULL)
pkey_ctx = backend._ffi.gc(pkey_ctx, backend._lib.EVP_PKEY_CTX_free)
res = init_func(pkey_ctx)
backend.openssl_assert(res == 1)
if algorithm is not None:
evp_md = backend._evp_md_non_null_from_algorithm(algorithm)
res = backend._lib.EVP_PKEY_CTX_set_signature_md(pkey_ctx, evp_md)
if res == 0:
backend._consume_errors()
raise UnsupportedAlgorithm(
"{} is not supported by this backend for RSA signing.".format(
algorithm.name
),
_Reasons.UNSUPPORTED_HASH,
)
res = backend._lib.EVP_PKEY_CTX_set_rsa_padding(pkey_ctx, padding_enum)
if res <= 0:
backend._consume_errors()
raise UnsupportedAlgorithm(
"{} is not supported for the RSA signature operation.".format(
padding.name
),
_Reasons.UNSUPPORTED_PADDING,
)
if isinstance(padding, PSS):
res = backend._lib.EVP_PKEY_CTX_set_rsa_pss_saltlen(
pkey_ctx, _get_rsa_pss_salt_length(padding, key, algorithm)
)
backend.openssl_assert(res > 0)
mgf1_md = backend._evp_md_non_null_from_algorithm(
padding._mgf._algorithm
)
res = backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(pkey_ctx, mgf1_md)
backend.openssl_assert(res > 0)
return pkey_ctx
def _rsa_sig_sign(backend, padding, algorithm, private_key, data):
pkey_ctx = _rsa_sig_setup(
backend,
padding,
algorithm,
private_key,
backend._lib.EVP_PKEY_sign_init,
)
buflen = backend._ffi.new("size_t *")
res = backend._lib.EVP_PKEY_sign(
pkey_ctx, backend._ffi.NULL, buflen, data, len(data)
)
backend.openssl_assert(res == 1)
buf = backend._ffi.new("unsigned char[]", buflen[0])
res = backend._lib.EVP_PKEY_sign(pkey_ctx, buf, buflen, data, len(data))
if res != 1:
errors = backend._consume_errors_with_text()
raise ValueError(
"Digest or salt length too long for key size. Use a larger key "
"or shorter salt length if you are specifying a PSS salt",
errors,
)
return backend._ffi.buffer(buf)[:]
def _rsa_sig_verify(backend, padding, algorithm, public_key, signature, data):
pkey_ctx = _rsa_sig_setup(
backend,
padding,
algorithm,
public_key,
backend._lib.EVP_PKEY_verify_init,
)
res = backend._lib.EVP_PKEY_verify(
pkey_ctx, signature, len(signature), data, len(data)
)
# The previous call can return negative numbers in the event of an
# error. This is not a signature failure but we need to fail if it
# occurs.
backend.openssl_assert(res >= 0)
if res == 0:
backend._consume_errors()
raise InvalidSignature
def _rsa_sig_recover(backend, padding, algorithm, public_key, signature):
pkey_ctx = _rsa_sig_setup(
backend,
padding,
algorithm,
public_key,
backend._lib.EVP_PKEY_verify_recover_init,
)
# Attempt to keep the rest of the code in this function as constant/time
# as possible. See the comment in _enc_dec_rsa_pkey_ctx. Note that the
# buflen parameter is used even though its value may be undefined in the
# error case. Due to the tolerant nature of Python slicing this does not
# trigger any exceptions.
maxlen = backend._lib.EVP_PKEY_size(public_key._evp_pkey)
backend.openssl_assert(maxlen > 0)
buf = backend._ffi.new("unsigned char[]", maxlen)
buflen = backend._ffi.new("size_t *", maxlen)
res = backend._lib.EVP_PKEY_verify_recover(
pkey_ctx, buf, buflen, signature, len(signature)
)
resbuf = backend._ffi.buffer(buf)[: buflen[0]]
backend._lib.ERR_clear_error()
# Assume that all parameter errors are handled during the setup phase and
# any error here is due to invalid signature.
if res != 1:
raise InvalidSignature
return resbuf
class _RSASignatureContext(AsymmetricSignatureContext):
def __init__(
self,
backend,
private_key: RSAPrivateKey,
padding: AsymmetricPadding,
algorithm: hashes.HashAlgorithm,
):
self._backend = backend
self._private_key = private_key
# We now call _rsa_sig_determine_padding in _rsa_sig_setup. However
# we need to make a pointless call to it here so we maintain the
# API of erroring on init with this context if the values are invalid.
_rsa_sig_determine_padding(backend, private_key, padding, algorithm)
self._padding = padding
self._algorithm = algorithm
self._hash_ctx = hashes.Hash(self._algorithm, self._backend)
def update(self, data: bytes) -> None:
self._hash_ctx.update(data)
def finalize(self) -> bytes:
return _rsa_sig_sign(
self._backend,
self._padding,
self._algorithm,
self._private_key,
self._hash_ctx.finalize(),
)
class _RSAVerificationContext(AsymmetricVerificationContext):
def __init__(
self,
backend,
public_key: RSAPublicKey,
signature: bytes,
padding: AsymmetricPadding,
algorithm: hashes.HashAlgorithm,
):
self._backend = backend
self._public_key = public_key
self._signature = signature
self._padding = padding
# We now call _rsa_sig_determine_padding in _rsa_sig_setup. However
# we need to make a pointless call to it here so we maintain the
# API of erroring on init with this context if the values are invalid.
_rsa_sig_determine_padding(backend, public_key, padding, algorithm)
padding = padding
self._algorithm = algorithm
self._hash_ctx = hashes.Hash(self._algorithm, self._backend)
def update(self, data: bytes) -> None:
self._hash_ctx.update(data)
def verify(self) -> None:
return _rsa_sig_verify(
self._backend,
self._padding,
self._algorithm,
self._public_key,
self._signature,
self._hash_ctx.finalize(),
)
class _RSAPrivateKey(RSAPrivateKey):
def __init__(self, backend, rsa_cdata, evp_pkey, _skip_check_key):
# RSA_check_key is slower in OpenSSL 3.0.0 due to improved
# primality checking. In normal use this is unlikely to be a problem
# since users don't load new keys constantly, but for TESTING we've
# added an init arg that allows skipping the checks. You should not
# use this in production code unless you understand the consequences.
if not _skip_check_key:
res = backend._lib.RSA_check_key(rsa_cdata)
if res != 1:
errors = backend._consume_errors_with_text()
raise ValueError("Invalid private key", errors)
# Blinding is on by default in many versions of OpenSSL, but let's
# just be conservative here.
res = backend._lib.RSA_blinding_on(rsa_cdata, backend._ffi.NULL)
backend.openssl_assert(res == 1)
self._backend = backend
self._rsa_cdata = rsa_cdata
self._evp_pkey = evp_pkey
n = self._backend._ffi.new("BIGNUM **")
self._backend._lib.RSA_get0_key(
self._rsa_cdata,
n,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
)
self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
self._key_size = self._backend._lib.BN_num_bits(n[0])
key_size = utils.read_only_property("_key_size")
def signer(
self, padding: AsymmetricPadding, algorithm: hashes.HashAlgorithm
) -> AsymmetricSignatureContext:
_warn_sign_verify_deprecated()
_check_not_prehashed(algorithm)
return _RSASignatureContext(self._backend, self, padding, algorithm)
def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes:
key_size_bytes = (self.key_size + 7) // 8
if key_size_bytes != len(ciphertext):
raise ValueError("Ciphertext length must be equal to key size.")
return _enc_dec_rsa(self._backend, self, ciphertext, padding)
def public_key(self) -> RSAPublicKey:
ctx = self._backend._lib.RSAPublicKey_dup(self._rsa_cdata)
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
ctx = self._backend._ffi.gc(ctx, self._backend._lib.RSA_free)
evp_pkey = self._backend._rsa_cdata_to_evp_pkey(ctx)
return _RSAPublicKey(self._backend, ctx, evp_pkey)
def private_numbers(self) -> RSAPrivateNumbers:
n = self._backend._ffi.new("BIGNUM **")
e = self._backend._ffi.new("BIGNUM **")
d = self._backend._ffi.new("BIGNUM **")
p = self._backend._ffi.new("BIGNUM **")
q = self._backend._ffi.new("BIGNUM **")
dmp1 = self._backend._ffi.new("BIGNUM **")
dmq1 = self._backend._ffi.new("BIGNUM **")
iqmp = self._backend._ffi.new("BIGNUM **")
self._backend._lib.RSA_get0_key(self._rsa_cdata, n, e, d)
self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(e[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(d[0] != self._backend._ffi.NULL)
self._backend._lib.RSA_get0_factors(self._rsa_cdata, p, q)
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
self._backend._lib.RSA_get0_crt_params(
self._rsa_cdata, dmp1, dmq1, iqmp
)
self._backend.openssl_assert(dmp1[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(dmq1[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(iqmp[0] != self._backend._ffi.NULL)
return RSAPrivateNumbers(
p=self._backend._bn_to_int(p[0]),
q=self._backend._bn_to_int(q[0]),
d=self._backend._bn_to_int(d[0]),
dmp1=self._backend._bn_to_int(dmp1[0]),
dmq1=self._backend._bn_to_int(dmq1[0]),
iqmp=self._backend._bn_to_int(iqmp[0]),
public_numbers=RSAPublicNumbers(
e=self._backend._bn_to_int(e[0]),
n=self._backend._bn_to_int(n[0]),
),
)
def private_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PrivateFormat,
encryption_algorithm: serialization.KeySerializationEncryption,
) -> bytes:
return self._backend._private_key_bytes(
encoding,
format,
encryption_algorithm,
self,
self._evp_pkey,
self._rsa_cdata,
)
def sign(
self,
data: bytes,
padding: AsymmetricPadding,
algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
) -> bytes:
data, algorithm = _calculate_digest_and_algorithm(
self._backend, data, algorithm
)
return _rsa_sig_sign(self._backend, padding, algorithm, self, data)
class _RSAPublicKey(RSAPublicKey):
def __init__(self, backend, rsa_cdata, evp_pkey):
self._backend = backend
self._rsa_cdata = rsa_cdata
self._evp_pkey = evp_pkey
n = self._backend._ffi.new("BIGNUM **")
self._backend._lib.RSA_get0_key(
self._rsa_cdata,
n,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
)
self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
self._key_size = self._backend._lib.BN_num_bits(n[0])
key_size = utils.read_only_property("_key_size")
def verifier(
self,
signature: bytes,
padding: AsymmetricPadding,
algorithm: hashes.HashAlgorithm,
) -> AsymmetricVerificationContext:
_warn_sign_verify_deprecated()
utils._check_bytes("signature", signature)
_check_not_prehashed(algorithm)
return _RSAVerificationContext(
self._backend, self, signature, padding, algorithm
)
def encrypt(self, plaintext: bytes, padding: AsymmetricPadding) -> bytes:
return _enc_dec_rsa(self._backend, self, plaintext, padding)
def public_numbers(self) -> RSAPublicNumbers:
n = self._backend._ffi.new("BIGNUM **")
e = self._backend._ffi.new("BIGNUM **")
self._backend._lib.RSA_get0_key(
self._rsa_cdata, n, e, self._backend._ffi.NULL
)
self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
self._backend.openssl_assert(e[0] != self._backend._ffi.NULL)
return RSAPublicNumbers(
e=self._backend._bn_to_int(e[0]),
n=self._backend._bn_to_int(n[0]),
)
def public_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PublicFormat,
) -> bytes:
return self._backend._public_key_bytes(
encoding, format, self, self._evp_pkey, self._rsa_cdata
)
def verify(
self,
signature: bytes,
data: bytes,
padding: AsymmetricPadding,
algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
) -> None:
data, algorithm = _calculate_digest_and_algorithm(
self._backend, data, algorithm
)
return _rsa_sig_verify(
self._backend, padding, algorithm, self, signature, data
)
def recover_data_from_signature(
self,
signature: bytes,
padding: AsymmetricPadding,
algorithm: typing.Optional[hashes.HashAlgorithm],
) -> bytes:
_check_not_prehashed(algorithm)
return _rsa_sig_recover(
self._backend, padding, algorithm, self, signature
)

View File

@@ -0,0 +1,66 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
import warnings
from cryptography import utils
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
def _evp_pkey_derive(backend, evp_pkey, peer_public_key):
ctx = backend._lib.EVP_PKEY_CTX_new(evp_pkey, backend._ffi.NULL)
backend.openssl_assert(ctx != backend._ffi.NULL)
ctx = backend._ffi.gc(ctx, backend._lib.EVP_PKEY_CTX_free)
res = backend._lib.EVP_PKEY_derive_init(ctx)
backend.openssl_assert(res == 1)
res = backend._lib.EVP_PKEY_derive_set_peer(ctx, peer_public_key._evp_pkey)
backend.openssl_assert(res == 1)
keylen = backend._ffi.new("size_t *")
res = backend._lib.EVP_PKEY_derive(ctx, backend._ffi.NULL, keylen)
backend.openssl_assert(res == 1)
backend.openssl_assert(keylen[0] > 0)
buf = backend._ffi.new("unsigned char[]", keylen[0])
res = backend._lib.EVP_PKEY_derive(ctx, buf, keylen)
if res != 1:
errors_with_text = backend._consume_errors_with_text()
raise ValueError("Error computing shared key.", errors_with_text)
return backend._ffi.buffer(buf, keylen[0])[:]
def _calculate_digest_and_algorithm(backend, data, algorithm):
if not isinstance(algorithm, Prehashed):
hash_ctx = hashes.Hash(algorithm, backend)
hash_ctx.update(data)
data = hash_ctx.finalize()
else:
algorithm = algorithm._algorithm
if len(data) != algorithm.digest_size:
raise ValueError(
"The provided data must be the same length as the hash "
"algorithm's digest size."
)
return (data, algorithm)
def _check_not_prehashed(signature_algorithm):
if isinstance(signature_algorithm, Prehashed):
raise TypeError(
"Prehashed is only supported in the sign and verify methods. "
"It cannot be used with signer, verifier or "
"recover_data_from_signature."
)
def _warn_sign_verify_deprecated():
warnings.warn(
"signer and verifier have been deprecated. Please use sign "
"and verify instead.",
utils.PersistentlyDeprecated2017,
stacklevel=3,
)

View File

@@ -0,0 +1,128 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography.hazmat.backends.openssl.utils import _evp_pkey_derive
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
_X25519_KEY_SIZE = 32
class _X25519PublicKey(X25519PublicKey):
def __init__(self, backend, evp_pkey):
self._backend = backend
self._evp_pkey = evp_pkey
def public_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PublicFormat,
) -> bytes:
if (
encoding is serialization.Encoding.Raw
or format is serialization.PublicFormat.Raw
):
if (
encoding is not serialization.Encoding.Raw
or format is not serialization.PublicFormat.Raw
):
raise ValueError(
"When using Raw both encoding and format must be Raw"
)
return self._raw_public_bytes()
return self._backend._public_key_bytes(
encoding, format, self, self._evp_pkey, None
)
def _raw_public_bytes(self) -> bytes:
ucharpp = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.EVP_PKEY_get1_tls_encodedpoint(
self._evp_pkey, ucharpp
)
self._backend.openssl_assert(res == 32)
self._backend.openssl_assert(ucharpp[0] != self._backend._ffi.NULL)
data = self._backend._ffi.gc(
ucharpp[0], self._backend._lib.OPENSSL_free
)
return self._backend._ffi.buffer(data, res)[:]
class _X25519PrivateKey(X25519PrivateKey):
def __init__(self, backend, evp_pkey):
self._backend = backend
self._evp_pkey = evp_pkey
def public_key(self) -> X25519PublicKey:
bio = self._backend._create_mem_bio_gc()
res = self._backend._lib.i2d_PUBKEY_bio(bio, self._evp_pkey)
self._backend.openssl_assert(res == 1)
evp_pkey = self._backend._lib.d2i_PUBKEY_bio(
bio, self._backend._ffi.NULL
)
self._backend.openssl_assert(evp_pkey != self._backend._ffi.NULL)
evp_pkey = self._backend._ffi.gc(
evp_pkey, self._backend._lib.EVP_PKEY_free
)
return _X25519PublicKey(self._backend, evp_pkey)
def exchange(self, peer_public_key: X25519PublicKey) -> bytes:
if not isinstance(peer_public_key, X25519PublicKey):
raise TypeError("peer_public_key must be X25519PublicKey.")
return _evp_pkey_derive(self._backend, self._evp_pkey, peer_public_key)
def private_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PrivateFormat,
encryption_algorithm: serialization.KeySerializationEncryption,
) -> bytes:
if (
encoding is serialization.Encoding.Raw
or format is serialization.PublicFormat.Raw
):
if (
format is not serialization.PrivateFormat.Raw
or encoding is not serialization.Encoding.Raw
or not isinstance(
encryption_algorithm, serialization.NoEncryption
)
):
raise ValueError(
"When using Raw both encoding and format must be Raw "
"and encryption_algorithm must be NoEncryption()"
)
return self._raw_private_bytes()
return self._backend._private_key_bytes(
encoding, format, encryption_algorithm, self, self._evp_pkey, None
)
def _raw_private_bytes(self) -> bytes:
# When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
# switch this to EVP_PKEY_new_raw_private_key
# The trick we use here is serializing to a PKCS8 key and just
# using the last 32 bytes, which is the key itself.
bio = self._backend._create_mem_bio_gc()
res = self._backend._lib.i2d_PKCS8PrivateKey_bio(
bio,
self._evp_pkey,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
0,
self._backend._ffi.NULL,
self._backend._ffi.NULL,
)
self._backend.openssl_assert(res == 1)
pkcs8 = self._backend._read_mem_bio(bio)
self._backend.openssl_assert(len(pkcs8) == 48)
return pkcs8[-_X25519_KEY_SIZE:]

View File

@@ -0,0 +1,112 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography.hazmat.backends.openssl.utils import _evp_pkey_derive
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.x448 import (
X448PrivateKey,
X448PublicKey,
)
_X448_KEY_SIZE = 56
class _X448PublicKey(X448PublicKey):
def __init__(self, backend, evp_pkey):
self._backend = backend
self._evp_pkey = evp_pkey
def public_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PublicFormat,
) -> bytes:
if (
encoding is serialization.Encoding.Raw
or format is serialization.PublicFormat.Raw
):
if (
encoding is not serialization.Encoding.Raw
or format is not serialization.PublicFormat.Raw
):
raise ValueError(
"When using Raw both encoding and format must be Raw"
)
return self._raw_public_bytes()
return self._backend._public_key_bytes(
encoding, format, self, self._evp_pkey, None
)
def _raw_public_bytes(self) -> bytes:
buf = self._backend._ffi.new("unsigned char []", _X448_KEY_SIZE)
buflen = self._backend._ffi.new("size_t *", _X448_KEY_SIZE)
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
self._evp_pkey, buf, buflen
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _X448_KEY_SIZE)
return self._backend._ffi.buffer(buf, _X448_KEY_SIZE)[:]
class _X448PrivateKey(X448PrivateKey):
def __init__(self, backend, evp_pkey):
self._backend = backend
self._evp_pkey = evp_pkey
def public_key(self) -> X448PublicKey:
buf = self._backend._ffi.new("unsigned char []", _X448_KEY_SIZE)
buflen = self._backend._ffi.new("size_t *", _X448_KEY_SIZE)
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
self._evp_pkey, buf, buflen
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _X448_KEY_SIZE)
return self._backend.x448_load_public_bytes(buf)
def exchange(self, peer_public_key: X448PublicKey) -> bytes:
if not isinstance(peer_public_key, X448PublicKey):
raise TypeError("peer_public_key must be X448PublicKey.")
return _evp_pkey_derive(self._backend, self._evp_pkey, peer_public_key)
def private_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PrivateFormat,
encryption_algorithm: serialization.KeySerializationEncryption,
) -> bytes:
if (
encoding is serialization.Encoding.Raw
or format is serialization.PublicFormat.Raw
):
if (
format is not serialization.PrivateFormat.Raw
or encoding is not serialization.Encoding.Raw
or not isinstance(
encryption_algorithm, serialization.NoEncryption
)
):
raise ValueError(
"When using Raw both encoding and format must be Raw "
"and encryption_algorithm must be NoEncryption()"
)
return self._raw_private_bytes()
return self._backend._private_key_bytes(
encoding, format, encryption_algorithm, self, self._evp_pkey, None
)
def _raw_private_bytes(self) -> bytes:
buf = self._backend._ffi.new("unsigned char []", _X448_KEY_SIZE)
buflen = self._backend._ffi.new("size_t *", _X448_KEY_SIZE)
res = self._backend._lib.EVP_PKEY_get_raw_private_key(
self._evp_pkey, buf, buflen
)
self._backend.openssl_assert(res == 1)
self._backend.openssl_assert(buflen[0] == _X448_KEY_SIZE)
return self._backend._ffi.buffer(buf, _X448_KEY_SIZE)[:]

View File

@@ -0,0 +1,70 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
import datetime
import warnings
from cryptography import utils, x509
# This exists for pyOpenSSL compatibility and SHOULD NOT BE USED
# WE WILL REMOVE THIS VERY SOON.
def _Certificate(backend, x509) -> x509.Certificate: # noqa: N802
warnings.warn(
"This version of cryptography contains a temporary pyOpenSSL "
"fallback path. Upgrade pyOpenSSL now.",
utils.DeprecatedIn35,
)
return backend._ossl2cert(x509)
# This exists for pyOpenSSL compatibility and SHOULD NOT BE USED
# WE WILL REMOVE THIS VERY SOON.
def _CertificateSigningRequest( # noqa: N802
backend, x509_req
) -> x509.CertificateSigningRequest:
warnings.warn(
"This version of cryptography contains a temporary pyOpenSSL "
"fallback path. Upgrade pyOpenSSL now.",
utils.DeprecatedIn35,
)
return backend._ossl2csr(x509_req)
# This exists for pyOpenSSL compatibility and SHOULD NOT BE USED
# WE WILL REMOVE THIS VERY SOON.
def _CertificateRevocationList( # noqa: N802
backend, x509_crl
) -> x509.CertificateRevocationList:
warnings.warn(
"This version of cryptography contains a temporary pyOpenSSL "
"fallback path. Upgrade pyOpenSSL now.",
utils.DeprecatedIn35,
)
return backend._ossl2crl(x509_crl)
class _RawRevokedCertificate(x509.RevokedCertificate):
def __init__(
self,
serial_number: int,
revocation_date: datetime.datetime,
extensions: x509.Extensions,
):
self._serial_number = serial_number
self._revocation_date = revocation_date
self._extensions = extensions
@property
def serial_number(self) -> int:
return self._serial_number
@property
def revocation_date(self) -> datetime.datetime:
return self._revocation_date
@property
def extensions(self) -> x509.Extensions:
return self._extensions