Python源码示例:cryptography.x509.load_pem_x509_certificate()
示例1
def setup_method(self, method):
super(TestCustodiaIPACertRequests, self).setup_method(method)
cert = x509.load_pem_x509_certificate(CERT_PEM, default_backend())
cert_der = cert.public_bytes(serialization.Encoding.DER)
cert_stripped = base64.b64encode(cert_der)
ca = x509.load_pem_x509_certificate(CA_PEM, default_backend())
ca_der = ca.public_bytes(serialization.Encoding.DER)
self.m_api.Command.cert_request.return_value = {
u'result': {
u'subject': 'dummy subject',
u'request_id': 1,
u'serial_number': 1,
u'certificate': cert_stripped,
u'certificate_chain': (
cert_der,
ca_der,
)
}
}
示例2
def create(cls, client, password, cert_data):
"""Create a new certificate."""
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
base64_cert = cert.public_bytes(Encoding.PEM).decode('utf-8')
# STRIP OUT CERT META "-----BEGIN CERTIFICATE-----"
base64_cert = '\n'.join(base64_cert.split('\n')[1:-2])
data = {
'type': 'client',
'certificate': base64_cert,
'password': password,
}
client.api.certificates.post(json=data)
# XXX: rockstar (08 Jun 2016) - Please see the open lxd bug here:
# https://github.com/lxc/lxd/issues/2092
fingerprint = binascii.hexlify(
cert.fingerprint(hashes.SHA256())).decode('utf-8')
return cls.get(client, fingerprint)
示例3
def create(vek, keySizeBytes, certificatePath):
#print("VEK: " + str(binascii.hexlify(vek)))
publicKeyPem = open(certificatePath).read()
publicKey = RSA.importKey(publicKeyPem)
# Convert from PEM to DER
lines = publicKeyPem.replace(" ", '').split()
publicKeyDer = binascii.a2b_base64(''.join(lines[1:-1]))
cert = x509.load_pem_x509_certificate(SmartStr(publicKeyPem), default_backend())
subjectName = cert.subject.rfc4514_string()
serial = cert.serial_number
cipher = PKCS1_OAEP.new(key=publicKey, hashAlgo=SHA256, mgfunc=lambda x, y: pss.MGF1(x, y, SHA1))
wrapped_key = cipher.encrypt(vek)
#print("WrappedKey: " + str(binascii.hexlify(wrapped_key)))
return CertEncryptedKeyBag(subjectName, serial, keySizeBytes, wrapped_key)
示例4
def validate_ca_cert(self, ignored):
expected = self._get_expected_ca_cert_fingerprint()
algo, expectedfp = expected.split(':')
expectedfp = expectedfp.replace(' ', '')
backend = default_backend()
with open(self._get_ca_cert_path(), 'r') as f:
certstr = f.read()
cert = load_pem_x509_certificate(certstr, backend)
hasher = getattr(hashes, algo)()
fpbytes = cert.fingerprint(hasher)
fp = binascii.hexlify(fpbytes)
if fp != expectedfp:
os.unlink(self._get_ca_cert_path())
self.log.error("Fingerprint of CA cert doesn't match: %s <-> %s"
% (fp, expectedfp))
raise NetworkError("The provider's CA fingerprint doesn't match")
示例5
def test_generate_cert_key_pair(self):
cn = 'testCN'
bit_length = 512
# Attempt to generate a cert/key pair
cert_object = self.cert_generator.generate_cert_key_pair(
cn=cn,
validity=2 * 365 * 24 * 60 * 60,
bit_length=bit_length,
passphrase=self.ca_private_key_passphrase,
ca_cert=self.ca_certificate,
ca_key=self.ca_private_key,
ca_key_pass=self.ca_private_key_passphrase
)
# Validate that the cert and key are loadable
cert = x509.load_pem_x509_certificate(
data=cert_object.certificate, backend=backends.default_backend())
self.assertIsNotNone(cert)
key = serialization.load_pem_private_key(
data=cert_object.private_key,
password=cert_object.private_key_passphrase,
backend=backends.default_backend())
self.assertIsNotNone(key)
示例6
def pem_certificate_upload(f):
"""Parse PEM formatted certificate in request data
TODO: form field name option
"""
@wraps(f)
def decorator(*args, **kwargs):
try:
certificate_data = request.files['file'].read()
g.certificate = x509.load_pem_x509_certificate(certificate_data, backend=default_backend())
except UnsupportedAlgorithm as e:
current_app.logger.info('could not parse PEM certificate data')
abort(400, 'invalid input data')
return f(*args, **kwargs)
return decorator
示例7
def anchor_certs():
"""Download a list of certificates to trust the MDM
The response is a JSON array of base64 encoded DER certs as described in the DEP profile creation documentation."""
anchors = []
if 'CA_CERTIFICATE' in current_app.config:
with open(current_app.config['CA_CERTIFICATE'], 'rb') as fd:
pem_data = fd.read()
c: x509.Certificate = x509.load_pem_x509_certificate(pem_data, backend=default_backend())
der = c.public_bytes(Encoding.DER)
anchors.append(urlsafe_b64encode(der))
if 'SSL_CERTIFICATE' in current_app.config:
with open(current_app.config['SSL_CERTIFICATE'], 'rb') as fd:
pem_data = fd.read()
c: x509.Certificate = x509.load_pem_x509_certificate(pem_data, backend=default_backend())
der = c.public_bytes(Encoding.DER)
anchors.append(urlsafe_b64encode(der))
return jsonify(anchors)
示例8
def add_valid_from(apps, schema_editor):
Certificate = apps.get_model('django_ca', 'Certificate')
for cert in Certificate.objects.all():
backend = default_backend()
pem = x509.load_pem_x509_certificate(force_bytes(cert.pub), backend)
valid_from = pem.not_valid_before
if settings.USE_TZ:
valid_from = timezone.make_aware(valid_from)
cert.valid_from = valid_from
cert.save()
CertificateAuthority = apps.get_model('django_ca', 'CertificateAuthority')
for cert in CertificateAuthority.objects.all():
backend = default_backend()
pem = x509.load_pem_x509_certificate(force_bytes(cert.pub), backend)
valid_from = pem.not_valid_before
if settings.USE_TZ:
valid_from = timezone.make_aware(valid_from)
cert.valid_from = valid_from
cert.save()
示例9
def handle(self, pub, **options):
pub_data = pub.read()
try: # close reader objects (otherwise we get a ResourceWarning)
pub.close()
except Exception: # pragma: no cover
pass
# load public key
try:
pub_loaded = x509.load_pem_x509_certificate(pub_data, default_backend())
except Exception:
try:
pub_loaded = x509.load_der_x509_certificate(pub_data, default_backend())
except Exception:
raise CommandError('Unable to load public key.')
cert = Certificate(ca=options['ca'])
cert.x509 = pub_loaded
cert.save()
示例10
def _load_pub(data):
basedir = data.get('basedir', settings.FIXTURES_DIR)
path = os.path.join(basedir, data['pub_filename'])
with open(path, 'rb') as stream:
pem = stream.read().replace(b'\r\n', b'\n')
pub_data = {
'pem': pem.decode('utf-8'),
'parsed': x509.load_pem_x509_certificate(pem, default_backend()),
}
if data.get('pub_der_filename'):
der_path = os.path.join(basedir, data['pub_der_filename'])
with open(der_path, 'rb') as stream:
der = stream.read().replace(b'\r\n', b'\n')
pub_data['der'] = der
# Failes for alt-extensions since alternative AKI was added
#pub_data['der_parsed'] = x509.load_der_x509_certificate(der, default_backend()),
return pub_data
示例11
def _get_public_tls_parameters(service_certificate_path):
with open(service_certificate_path, "rb") as pem_file:
pem_data = pem_file.read()
cert = x509.load_pem_x509_certificate(pem_data, default_backend())
private_key = serialization.load_pem_private_key(
pem_data,
password=None,
backend=default_backend())
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
return {
'SSLCertificate': cert_pem,
'SSLKey': key_pem
}
示例12
def _get_public_tls_parameters(service_certificate_path):
with open(service_certificate_path, "rb") as pem_file:
pem_data = pem_file.read()
cert = x509.load_pem_x509_certificate(pem_data, default_backend())
private_key = serialization.load_pem_private_key(
pem_data,
password=None,
backend=default_backend())
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
return {
'SSLCertificate': cert_pem,
'SSLKey': key_pem
}
示例13
def _scan_a_cert(id, cert_path, key_path, assigns, is_acme=False):
with open(cert_path, "rb") as f:
crt = x509.load_pem_x509_certificate(f.read(), default_backend())
with open(key_path, "rb") as f:
key = serialization.load_pem_private_key(
f.read(),
password=None,
backend=default_backend()
)
sha1 = binascii.hexlify(crt.fingerprint(hashes.SHA1())).decode()
md5 = binascii.hexlify(crt.fingerprint(hashes.MD5())).decode()
sha1 = ":".join([sha1[i:i+2].upper() for i in range(0, len(sha1), 2)])
md5 = ":".join([md5[i:i+2].upper() for i in range(0, len(md5), 2)])
kt = "RSA" if isinstance(key.public_key(), rsa.RSAPublicKey) else "DSA"
common_name = crt.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
return Certificate(
id=id, cert_path=cert_path, key_path=key_path, keytype=kt,
keylength=key.key_size, domain=common_name[0].value,
assigns=assigns.get(id, []), expiry=crt.not_valid_after, sha1=sha1,
md5=md5, is_acme=is_acme)
示例14
def get_certificate(self, kid):
# retrieve keys from jwks_url
resp = self.request(self.jwks_url(), method='GET')
resp.raise_for_status()
# find the proper key for the kid
for key in resp.json()['keys']:
if key['kid'] == kid:
x5c = key['x5c'][0]
break
else:
raise DecodeError('Cannot find kid={}'.format(kid))
certificate = '-----BEGIN CERTIFICATE-----\n' \
'{}\n' \
'-----END CERTIFICATE-----'.format(x5c)
return load_pem_x509_certificate(certificate.encode(),
default_backend())
示例15
def __init__(self, data):
"""
Cert constructor
It can handle PEM and DER encoded strings and lists of int bytes.
:param data: bytes or list of int
"""
if type(data) == list:
data = bytes(data)
if type(data) != bytes:
raise Exception("data must be bytes or list of int bytes")
self.__raw_data = data
if b"-----BEGIN CERTIFICATE-----" in data:
self.x509 = x509.load_pem_x509_certificate(data, backends.default_backend())
self.__raw_type = "PEM"
else:
self.x509 = x509.load_der_x509_certificate(data, backends.default_backend())
self.__raw_type = "DER"
示例16
def fqdns_from_certificate(cert_data):
try:
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
except ValueError:
pass
try:
cert = x509.load_der_x509_certificate(cert_data, default_backend())
except ValueError:
raise ValueError("No recognized cert format. Allowed: PEM or DER")
names = set()
names.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value.lower().rstrip('.'))
try:
alt_names = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.extensions.ExtensionNotFound:
alt_names = None
if alt_names:
for alt_name in alt_names.value.get_values_for_type(x509.DNSName):
names.add(alt_name.lower().rstrip('.'))
return list(sorted(names))
示例17
def cert_get_names(cert_data):
try:
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
except ValueError:
pass
try:
cert = x509.load_der_x509_certificate(cert_data, default_backend())
except ValueError:
raise ValueError("No recognized cert format. Allowed: PEM or DER")
names = set()
names.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value.lower())
try:
alt_names = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.extensions.ExtensionNotFound:
alt_names = None
if alt_names:
for alt_name in alt_names.value.get_values_for_type(x509.DNSName):
names.add(alt_name.lower())
return list(sorted(names))
示例18
def _get_rsa_parameters(server_info: ServerConnectivityInfo, openssl_cipher_string: str) -> Optional[RSAPublicNumbers]:
ssl_connection = server_info.get_preconfigured_tls_connection()
ssl_connection.ssl_client.set_cipher_list(openssl_cipher_string)
parsed_cert = None
try:
# Perform the SSL handshake
ssl_connection.connect()
cert_as_pem = ssl_connection.ssl_client.get_received_chain()[0]
parsed_cert = load_pem_x509_certificate(cert_as_pem.encode("ascii"), backend=default_backend())
except ServerRejectedTlsHandshake:
# Server does not support RSA cipher suites?
pass
except ClientCertificateRequested:
# AD: The server asked for a client cert. We could still retrieve the server certificate, but it is unclear
# to me if the ROBOT check is supposed to work even if we do not provide a client cert. My guess is that
# it should not work since it requires completing a full handshake, which we can't without a client cert.
# Hence, propagate the error to make the check fail.
raise
finally:
ssl_connection.close()
if parsed_cert:
public_key = parsed_cert.public_key()
if isinstance(public_key, RSAPublicKey):
return public_key.public_numbers()
else:
return None
else:
return None
示例19
def _monkeypatch_to_fix_certificate_asdict() -> None:
# H4ck: monkeypatch the _Certificate class to add __deepcopy__() so that when we call asdict() on a dataclass
# that contains a _Certificate, asdict() succeeds. Without this, generating JSON for the certinfo scan command
# will crash because the asdict() function uses deepcopy(), but certificates returned by cryptography.x509
# don't support it so SSLyze would crash. This class is a workaround to fix JSON output.
# I opened an issue about it in the cryptography repo at https://github.com/pyca/cryptography/issues/5129
def _deepcopy_method_for_x509_certificate(inner_self: _Certificate, memo: str) -> x509.Certificate:
return x509.load_pem_x509_certificate(inner_self.public_bytes(Encoding.PEM), backend=default_backend())
_Certificate.__deepcopy__ = _deepcopy_method_for_x509_certificate
# Call it on import... hacky but we don't have a choice
示例20
def _convert_string_output(self, line):
"""
Converts command output to instance of X.509 Certificate Object.
"""
if self._regex_helper.search_compiled(OpensslX509TextIn._re_end_certificate, line):
self.current_ret = load_pem_x509_certificate(data=self._string_output.encode(), backend=default_backend())
raise ParsingDone
# unable to load certificate
示例21
def _get_secret_data(self, secret):
"""Retrieves the secret data.
Converts the Barbican secret to bytes suitable for a Castellan object.
If the secret is a public key, private key, or certificate, the secret
is expected to be in PEM format and will be converted to DER.
:param secret: the secret from barbican with the payload of data
:returns: the secret data
"""
if secret.secret_type == 'public':
key = serialization.load_pem_public_key(
secret.payload,
backend=backends.default_backend())
return key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
elif secret.secret_type == 'private':
key = serialization.load_pem_private_key(
secret.payload,
backend=backends.default_backend(),
password=None)
return key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
elif secret.secret_type == 'certificate':
cert = cryptography_x509.load_pem_x509_certificate(
secret.payload,
backend=backends.default_backend())
return cert.public_bytes(encoding=serialization.Encoding.DER)
else:
return secret.payload
示例22
def import_from_pem(self, data, password=None):
"""Imports a key from data loaded from a PEM file.
The key may be encrypted with a password.
Private keys (PKCS#8 format), public keys, and X509 certificate's
public keys can be imported with this interface.
:param data(bytes): The data contained in a PEM file.
:param password(bytes): An optional password to unwrap the key.
"""
try:
key = serialization.load_pem_private_key(
data, password=password, backend=default_backend())
except ValueError as e:
if password is not None:
raise e
try:
key = serialization.load_pem_public_key(
data, backend=default_backend())
except ValueError:
try:
cert = x509.load_pem_x509_certificate(
data, backend=default_backend())
key = cert.public_key()
except ValueError:
raise e
self.import_from_pyca(key)
self._params['kid'] = self.thumbprint()
示例23
def certificate_needs_renewed(cert_dir):
with open(os.path.join(cert_dir, 'cert.pem'), 'rb') as cert_file:
pem_data = cert_file.read()
cert = x509.load_pem_x509_certificate(pem_data, default_backend())
not_valid_before = cert.not_valid_before
not_valid_after = cert.not_valid_after
return not_valid_after - datetime.datetime.utcnow() < (cert.not_valid_after - cert.not_valid_before)/2
示例24
def read_cert(cert_dir):
with open(os.path.join(cert_dir, 'cert.pem'), 'rb') as cert_file:
pem_data = cert_file.read()
cert = x509.load_pem_x509_certificate(pem_data, default_backend())
common_name = [na.value for na in cert.subject if na.oid._dotted_string == "2.5.4.3"][0]
subject_alternative_names = [ext.value for ext in cert.extensions if ext.oid._dotted_string == "2.5.29.17"][0]
dns_names = subject_alternative_names.get_values_for_type(x509.DNSName)
ip_sans = [str(ip) for ip in subject_alternative_names.get_values_for_type(x509.IPAddress)]
return common_name, dns_names, ip_sans
示例25
def validate_cert_name(cert_file, robot_name):
"""Validate the name on Vector's certificate against the user-provided name"""
with open(cert_file, "rb") as f:
cert_file = f.read()
cert = x509.load_pem_x509_certificate(cert_file, default_backend())
for fields in cert.subject:
current = str(fields.oid)
if "commonName" in current:
common_name = fields.value
if common_name != robot_name:
print(colored(" ERROR", "red"))
sys.exit("The name of the certificate ({}) does not match the name provided ({}).\n"
"Please verify the name, and try again.".format(common_name, robot_name))
else:
return
示例26
def test_sign_cert(self):
# Attempt sign a cert
signed_cert = self.cert_generator.sign_cert(
csr=self.certificate_signing_request,
validity=2 * 365 * 24 * 60 * 60,
ca_cert=self.ca_certificate,
ca_key=self.ca_private_key,
ca_key_pass=self.ca_private_key_passphrase,
ca_digest=self.signing_digest
)
self.assertIn("-----BEGIN CERTIFICATE-----",
signed_cert.decode('ascii'))
# Load the cert for specific tests
cert = x509.load_pem_x509_certificate(
data=signed_cert, backend=backends.default_backend())
# Make sure expiry time is accurate
should_expire = (datetime.datetime.utcnow() +
datetime.timedelta(seconds=2 * 365 * 24 * 60 * 60))
diff = should_expire - cert.not_valid_after
self.assertLess(diff, datetime.timedelta(seconds=10))
# Make sure this is a version 3 X509.
self.assertEqual('v3', cert.version.name)
# Make sure this cert is marked as Server and Client Cert via the
# extended Key Usage extension
self.assertIn(x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
cert.extensions.get_extension_for_class(
x509.ExtendedKeyUsage).value._usages)
self.assertIn(x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH,
cert.extensions.get_extension_for_class(
x509.ExtendedKeyUsage).value._usages)
# Make sure this cert can't sign other certs
self.assertFalse(cert.extensions.get_extension_for_class(
x509.BasicConstraints).value.ca)
示例27
def get_host_names(certificate):
"""Extract the host names from the Pem encoded X509 certificate
:param certificate: A PEM encoded certificate
:returns: A dictionary containing the following keys:
['cn', 'dns_names']
where 'cn' is the CN from the SubjectName of the
certificate, and 'dns_names' is a list of dNSNames
(possibly empty) from the SubjectAltNames of the certificate.
"""
if isinstance(certificate, str):
certificate = certificate.encode('utf-8')
try:
cert = x509.load_pem_x509_certificate(certificate,
backends.default_backend())
cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0]
host_names = {
'cn': cn.value.lower(),
'dns_names': []
}
try:
ext = cert.extensions.get_extension_for_oid(
x509.OID_SUBJECT_ALTERNATIVE_NAME
)
host_names['dns_names'] = ext.value.get_values_for_type(
x509.DNSName)
except x509.ExtensionNotFound:
LOG.debug("%s extension not found",
x509.OID_SUBJECT_ALTERNATIVE_NAME)
return host_names
except Exception:
LOG.exception('Unreadable Certificate.')
raise exceptions.UnreadableCert
示例28
def _get_x509_from_pem_bytes(certificate_pem):
"""Parse X509 data from a PEM encoded certificate
:param certificate_pem: Certificate in PEM format
:returns: crypto high-level x509 data from the PEM string
"""
if isinstance(certificate_pem, str):
certificate_pem = certificate_pem.encode('utf-8')
try:
x509cert = x509.load_pem_x509_certificate(certificate_pem,
backends.default_backend())
except Exception:
LOG.exception('Unreadable Certificate.')
raise exceptions.UnreadableCert
return x509cert
示例29
def deserialize(self, data):
cert = x509.load_pem_x509_certificate(data, default_backend())
key_material = cert.public_key().public_numbers()
self._e = key_material.e
self._n = key_material.n
示例30
def _get_x509_from_pem_bytes(certificate_pem):
"""Parse X509 data from a PEM encoded certificate
:param certificate_pem: Certificate in PEM format
:returns: crypto high-level x509 data from the PEM string
"""
if type(certificate_pem) == six.text_type:
certificate_pem = certificate_pem.encode('utf-8')
try:
x509cert = x509.load_pem_x509_certificate(certificate_pem,
backends.default_backend())
except Exception:
LOG.exception('Unreadable Certificate.')
raise f5_ex.UnreadableCert
return x509cert