Python源码示例:cryptography.x509.load_der_x509_certificate()
示例1
def validate(self, authenticator_data, rp_id_hash, client_data_hash):
# See https://www.w3.org/TR/webauthn/#fido-u2f-attestation, "Verification procedure"
credential = authenticator_data.credential
public_key_u2f = b'\x04' + credential.public_key.x + credential.public_key.y
verification_data = b'\x00' + rp_id_hash + client_data_hash + credential.id + public_key_u2f
assert len(credential.public_key.x) == 32
assert len(credential.public_key.y) == 32
self.cert_public_key.verify(self.signature, verification_data, ec.ECDSA(hashes.SHA256()))
key_id = x509.SubjectKeyIdentifier.from_public_key(self.cert_public_key).digest.hex()
att_root_cert_chain = self.metadata_for_key_id(key_id)["attestationRootCertificates"]
# TODO: implement full cert chain validation
# See https://cryptography.io/en/latest/x509/reference/#cryptography.x509.Certificate.tbs_certificate_bytes
# See https://github.com/pyca/cryptography/issues/2381
# See https://github.com/wbond/certvalidator
assert len(att_root_cert_chain) == 1
att_root_cert = x509.load_der_x509_certificate(att_root_cert_chain[0].encode(),
cryptography.hazmat.backends.default_backend())
att_root_cert.public_key().verify(self.att_cert.signature,
self.att_cert.tbs_certificate_bytes,
padding.PKCS1v15(),
self.att_cert.signature_hash_algorithm)
return self.validated_attestation(type="Basic", trust_path="x5c", credential=credential)
示例2
def request_cert(self, builder, **kwargs):
"""Send CSR and request certificate
"""
signed = self._sign_csr(builder)
csr_pem = signed.public_bytes(serialization.Encoding.PEM)
if not isinstance(csr_pem, six.text_type):
csr_pem = csr_pem.decode('ascii')
response = self._cert_request(csr_pem, **kwargs)
if self.plugin.chain:
certs = tuple(
x509.load_der_x509_certificate(cert, self.backend)
for cert in response[u'result'][u'certificate_chain']
)
else:
# certificate is just base64 without BEGIN/END certificate
cert = base64.b64decode(response[u'result'][u'certificate'])
certs = (x509.load_der_x509_certificate(cert, self.backend), )
pem = [self._dump_privkey(self._privkey)]
pem.extend(self._dump_cert(cert) for cert in certs)
return response, '\n'.join(pem)
示例3
def _process_pkcs7_substrate(substrate):
contentInfo, _ = der_decoder.decode(substrate,
asn1Spec=rfc2315.ContentInfo())
contentType = contentInfo.getComponentByName('contentType')
if contentType != rfc2315.signedData:
raise Exception
content, _ = der_decoder.decode(
contentInfo.getComponentByName('content'),
asn1Spec=rfc2315.SignedData())
for blob in content.getComponentByName('certificates'):
cert = x509.load_der_x509_certificate(der_encoder.encode(blob),
backends.default_backend())
print(cert.public_bytes(
encoding=serialization.Encoding.PEM).decode(
'unicode_escape'), end='')
# Main program code
示例4
def test_xmldsig_interop_TR2012(self):
def get_x509_cert(**kwargs):
from cryptography.x509 import load_der_x509_certificate
from OpenSSL.crypto import X509
with open(os.path.join(interop_dir, "TR2012", "rsa-cert.der"), "rb") as fh:
return [X509.from_cryptography(load_der_x509_certificate(fh.read(), backend=default_backend()))]
signature_files = glob(os.path.join(interop_dir, "TR2012", "signature*.xml"))
for signature_file in signature_files:
print("Verifying", signature_file)
with open(signature_file, "rb") as fh:
try:
sig = fh.read()
XMLVerifier().verify(sig, require_x509=False, hmac_key="testkey", validate_schema=True,
cert_resolver=get_x509_cert if "x509digest" in signature_file else None)
decoded_sig = sig.decode("utf-8")
except Exception as e:
if "keyinforeference" in signature_file:
print("Unsupported test case:", type(e), e)
elif "x509digest" in signature_file:
assert isinstance(e, InvalidCertificate)
else:
raise
示例5
def handle(self, pub, **options):
pub_data = pub.read()
try: # close reader objects (otherwise we get a ResourceWarning)
pub.close()
except Exception: # pragma: no cover
pass
# load public key
try:
pub_loaded = x509.load_pem_x509_certificate(pub_data, default_backend())
except Exception:
try:
pub_loaded = x509.load_der_x509_certificate(pub_data, default_backend())
except Exception:
raise CommandError('Unable to load public key.')
cert = Certificate(ca=options['ca'])
cert.x509 = pub_loaded
cert.save()
示例6
def _load_pub(data):
basedir = data.get('basedir', settings.FIXTURES_DIR)
path = os.path.join(basedir, data['pub_filename'])
with open(path, 'rb') as stream:
pem = stream.read().replace(b'\r\n', b'\n')
pub_data = {
'pem': pem.decode('utf-8'),
'parsed': x509.load_pem_x509_certificate(pem, default_backend()),
}
if data.get('pub_der_filename'):
der_path = os.path.join(basedir, data['pub_der_filename'])
with open(der_path, 'rb') as stream:
der = stream.read().replace(b'\r\n', b'\n')
pub_data['der'] = der
# Failes for alt-extensions since alternative AKI was added
#pub_data['der_parsed'] = x509.load_der_x509_certificate(der, default_backend()),
return pub_data
示例7
def test_BackuprKey_BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID(self):
dce, rpctransport = self.connect()
request = bkrp.BackuprKey()
request['pguidActionAgent'] = bkrp.BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID
request['pDataIn'] = NULL
request['cbDataIn'] = 0
request['dwParam'] = 0
resp = dce.request(request)
resp.dump()
#print "LEN: %d" % len(''.join(resp['ppDataOut']))
#hexdump(''.join(resp['ppDataOut']))
cert = x509.load_der_x509_certificate(b''.join(resp['ppDataOut']), default_backend())
print(cert.subject)
print(cert.issuer)
print(cert.signature)
示例8
def test_hBackuprKey_BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID(self):
dce, rpctransport = self.connect()
request = bkrp.BackuprKey()
request['pguidActionAgent'] = bkrp.BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID
request['pDataIn'] = NULL
request['cbDataIn'] = 0
request['dwParam'] = 0
resp = bkrp.hBackuprKey(dce, bkrp.BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID, NULL)
resp.dump()
#print "LEN: %d" % len(''.join(resp['ppDataOut']))
#hexdump(''.join(resp['ppDataOut']))
cert = x509.load_der_x509_certificate(b''.join(resp['ppDataOut']), default_backend())
print(cert.subject)
print(cert.issuer)
print(cert.signature)
示例9
def __init__(self, data):
"""
Cert constructor
It can handle PEM and DER encoded strings and lists of int bytes.
:param data: bytes or list of int
"""
if type(data) == list:
data = bytes(data)
if type(data) != bytes:
raise Exception("data must be bytes or list of int bytes")
self.__raw_data = data
if b"-----BEGIN CERTIFICATE-----" in data:
self.x509 = x509.load_pem_x509_certificate(data, backends.default_backend())
self.__raw_type = "PEM"
else:
self.x509 = x509.load_der_x509_certificate(data, backends.default_backend())
self.__raw_type = "DER"
示例10
def fqdns_from_certificate(cert_data):
try:
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
except ValueError:
pass
try:
cert = x509.load_der_x509_certificate(cert_data, default_backend())
except ValueError:
raise ValueError("No recognized cert format. Allowed: PEM or DER")
names = set()
names.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value.lower().rstrip('.'))
try:
alt_names = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.extensions.ExtensionNotFound:
alt_names = None
if alt_names:
for alt_name in alt_names.value.get_values_for_type(x509.DNSName):
names.add(alt_name.lower().rstrip('.'))
return list(sorted(names))
示例11
def cert_get_names(cert_data):
try:
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
except ValueError:
pass
try:
cert = x509.load_der_x509_certificate(cert_data, default_backend())
except ValueError:
raise ValueError("No recognized cert format. Allowed: PEM or DER")
names = set()
names.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value.lower())
try:
alt_names = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.extensions.ExtensionNotFound:
alt_names = None
if alt_names:
for alt_name in alt_names.value.get_values_for_type(x509.DNSName):
names.add(alt_name.lower())
return list(sorted(names))
示例12
def metadata_toc(self):
if self._metadata_toc is None:
res = requests.get(self.mds_url)
res.raise_for_status()
jwt_header = jwt.get_unverified_header(res.content)
assert jwt_header["alg"] == "ES256"
cert = x509.load_der_x509_certificate(jwt_header["x5c"][0].encode(),
cryptography.hazmat.backends.default_backend())
self._metadata_toc = jwt.decode(res.content, key=cert.public_key(), algorithms=["ES256"])
return self._metadata_toc
示例13
def __init__(self, att_stmt):
self.att_stmt = att_stmt
assert len(self.att_stmt["x5c"]) == 1
der_cert = att_stmt["x5c"][0]
self.att_cert = x509.load_der_x509_certificate(der_cert, cryptography.hazmat.backends.default_backend())
self.cert_public_key = self.att_cert.public_key()
self.signature = att_stmt["sig"]
示例14
def _get_normalized_payload(self, encoded_bytes, secret_type):
"""Normalizes the bytes of the object.
Barbican expects certificates, public keys, and private keys in PEM
format, but Castellan expects these objects to be DER encoded bytes
instead.
"""
if secret_type == 'public':
key = serialization.load_der_public_key(
encoded_bytes,
backend=backends.default_backend())
return key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
elif secret_type == 'private':
key = serialization.load_der_private_key(
encoded_bytes,
backend=backends.default_backend(),
password=None)
return key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
elif secret_type == 'certificate':
cert = cryptography_x509.load_der_x509_certificate(
encoded_bytes,
backend=backends.default_backend())
return cert.public_bytes(encoding=serialization.Encoding.PEM)
else:
return encoded_bytes
示例15
def scan(self, offset=0, maxlen=None):
for hit in super(CertScanner, self).scan(offset=offset, maxlen=maxlen):
signature = self.address_space.read(hit + 4, 3)
size = self.profile.Object(
"unsigned be short", offset=hit+2, vm=self.address_space)
description = None
if signature.startswith(b"\x30\x82"):
data = self.address_space.read(hit, size + 4)
if x509:
try:
cert = x509.load_der_x509_certificate(data, default_backend())
description = dict((
attr.oid._name, attr.value) for attr in cert.subject)
except Exception:
pass
yield hit, "X509", data, description
elif signature.startswith(b"\x02\x01\x00"):
data = self.address_space.read(hit, size + 4)
if x509:
try:
pem = (b"-----BEGIN RSA PRIVATE KEY-----\n" +
base64.b64encode(data) +
b"-----END RSA PRIVATE KEY-----")
key = serialization.load_pem_private_key(
pem, password=None, backend=default_backend())
description = ""
except Exception:
pass
yield hit, "RSA", data, description
示例16
def verify_hit(self, hit, address_space):
signature = address_space.read(hit + 4, 3)
size = self.profile.Object(
"unsigned be short", offset=hit+2, vm=address_space)
description = None
if signature.startswith(b"\x30\x82"):
data = address_space.read(hit, size + 4)
if x509:
try:
cert = x509.load_der_x509_certificate(data, default_backend())
description = dict((
attr.oid._name, attr.value) for attr in cert.subject)
except Exception:
pass
return "X509", data, description
elif signature.startswith(b"\x02\x01\x00"):
data = address_space.read(hit, size + 4)
if x509:
try:
pem = (b"-----BEGIN RSA PRIVATE KEY-----\n" +
base64.b64encode(data) +
b"-----END RSA PRIVATE KEY-----")
key = serialization.load_pem_private_key(
pem, password=None, backend=default_backend())
description = ""
except Exception:
pass
return "RSA", data, description
return None, None, None
示例17
def _get_x509_from_der_bytes(certificate_der):
"""Parse X509 data from a DER encoded certificate
:param certificate_der: Certificate in DER format
:returns: crypto high-level x509 data from the DER-encoded certificate
"""
try:
x509cert = x509.load_der_x509_certificate(certificate_der,
backends.default_backend())
except Exception:
LOG.exception('Unreadable Certificate.')
raise exceptions.UnreadableCert
return x509cert
示例18
def _get_x509_from_der_bytes(certificate_der):
"""Parse X509 data from a DER encoded certificate
:param certificate_der: Certificate in DER format
:returns: crypto high-level x509 data from the DER-encoded certificate
"""
try:
x509cert = x509.load_der_x509_certificate(certificate_der,
backends.default_backend())
except Exception:
LOG.exception('Unreadable Certificate.')
raise f5_ex.UnreadableCert
return x509cert
示例19
def load_der_certificate(data):
"""
Loads a DER X.509 certificate.
"""
return x509.load_der_x509_certificate(data, default_backend())
示例20
def _get_certificate_hash(certificate_der):
"""
Get's the server's certificate hash for the tls-server-end-point
channel binding.
According to https://tools.ietf.org/html/rfc5929#section-4.1, this is
calculated by
Using the SHA256 is the signatureAlgorithm is MD5 or SHA1
The signatureAlgorithm if the hash function is neither MD5 or SHA1
:param certificate_der: The byte string of the server's certificate
:return: The byte string containing the hash of the server's
certificate
"""
backend = default_backend()
cert = x509.load_der_x509_certificate(certificate_der, backend)
try:
hash_algorithm = cert.signature_hash_algorithm
except UnsupportedAlgorithm as ex:
warning = "Failed to get the signature algorithm from the " \
"certificate, unable to pass channel bindings data: %s" \
% str(ex)
warnings.warn(warning, UnknownSignatureAlgorithmOID)
return None
# if the cert signature algorithm is either md5 or sha1 then use sha256
# otherwise use the signature algorithm of the cert itself
if hash_algorithm.name in ['md5', 'sha1']:
digest = hashes.Hash(hashes.SHA256(), backend)
else:
digest = hashes.Hash(hash_algorithm, backend)
digest.update(certificate_der)
certificate_hash = digest.finalize()
return certificate_hash
示例21
def get_certificate_from_connection(connection):
"""
Extract an X.509 certificate from a socket connection.
"""
certificate = connection.getpeercert(binary_form=True)
if certificate:
return x509.load_der_x509_certificate(
certificate,
backends.default_backend()
)
return None
示例22
def get_signer_certificate(content, signer):
certificates = content["certificates"]
signer_id = signer["sid"]
for certificate in certificates:
if certificate.chosen.serial_number == signer_id.chosen["serial_number"].native and \
certificate.chosen.issuer == signer_id.chosen["issuer"]:
certificate_bytes = certificate.dump()
certificate = x509.load_der_x509_certificate(certificate_bytes, default_backend())
certificate_i_cn = ", ".join(
o.value
for o in certificate.issuer.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
)
return certificate_i_cn, certificate_bytes, certificate
示例23
def ack_certificate_list(request: DBCommand, device: Device, response: dict):
"""Acknowledge a response to the ``CertificateList`` command.
Args:
request (Command): An instance of the command that prompted the device to come back with this request.
device (Device): The database model of the device responding.
response (dict): The response plist data, as a dictionary.
Returns:
void: Nothing is returned but this behaviour is subject to change.
"""
for c in device.installed_certificates:
db.session.delete(c)
certificates = response['CertificateList']
current_app.logger.debug(
'Received CertificatesList response containing {} certificate(s)'.format(len(certificates)))
for cert in certificates:
ic = InstalledCertificate()
ic.device = device
ic.device_udid = device.udid
ic.x509_cn = cert.get('CommonName', None)
ic.is_identity = cert.get('IsIdentity', None)
der_data = cert['Data']
certificate = x509.load_der_x509_certificate(der_data, default_backend())
ic.fingerprint_sha256 = hexlify(certificate.fingerprint(hashes.SHA256()))
ic.der_data = der_data
db.session.add(ic)
db.session.commit()
示例24
def from_der(der_data: bytes) -> x509.Certificate:
return x509.load_der_x509_certificate(der_data, default_backend())
示例25
def create_ssl_context(cert_byes, pk_bytes, password=None,
encoding=Encoding.PEM):
"""Create an SSL Context with the supplied cert/password.
:param cert_bytes array of bytes containing the cert encoded
using the method supplied in the ``encoding`` parameter
:param pk_bytes array of bytes containing the private key encoded
using the method supplied in the ``encoding`` parameter
:param password array of bytes containing the passphrase to be used
with the supplied private key. None if unencrypted.
Defaults to None.
:param encoding ``cryptography.hazmat.primitives.serialization.Encoding``
details the encoding method used on the ``cert_bytes`` and
``pk_bytes`` parameters. Can be either PEM or DER.
Defaults to PEM.
"""
backend = default_backend()
cert = None
key = None
if encoding == Encoding.PEM:
cert = x509.load_pem_x509_certificate(cert_byes, backend)
key = load_pem_private_key(pk_bytes, password, backend)
elif encoding == Encoding.DER:
cert = x509.load_der_x509_certificate(cert_byes, backend)
key = load_der_private_key(pk_bytes, password, backend)
else:
raise ValueError('Invalid encoding provided: Must be PEM or DER')
if not (cert and key):
raise ValueError('Cert and key could not be parsed from '
'provided data')
check_cert_dates(cert)
ssl_context = PyOpenSSLContext(PROTOCOL)
ssl_context._ctx.use_certificate(X509.from_cryptography(cert))
ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key))
return ssl_context
示例26
def get_key_from_jwk(self, jwk, alg):
if "x5c" in jwk:
return load_der_x509_certificate(
base64.b64decode(jwk["x5c"][0]), default_backend()
).public_key()
algorithm = self._algorithms[alg]
# Awkward:
return algorithm.from_jwk(json.dumps(jwk))
示例27
def load_certificate(path):
_, ext = os.path.splitext(path)
with open(path, "rb") as f:
if ext == ".pem":
return x509.load_pem_x509_certificate(f.read(), default_backend())
else:
return x509.load_der_x509_certificate(f.read(), default_backend())
示例28
def x509_from_der(data):
if not data:
return None
return x509.load_der_x509_certificate(data, default_backend())
示例29
def _load_keys(self, certificates):
new_keys = []
for cert in certificates:
logger.debug("Loading public key from certificate: %s", cert)
cert_obj = load_der_x509_certificate(base64.b64decode(cert), backend)
new_keys.append(cert_obj.public_key())
self.signing_keys = new_keys
示例30
def verify_sig(self, encoded_cert):
cert = x509.load_der_x509_certificate(encoded_cert, default_backend())
crypto.verify(
self.ca.cert,
cert.signature,
cert.tbs_certificate_bytes,
'sha256')