Python源码示例:cryptography.x509.load_der_x509_certificate()

示例1
def validate(self, authenticator_data, rp_id_hash, client_data_hash):
        # See https://www.w3.org/TR/webauthn/#fido-u2f-attestation, "Verification procedure"
        credential = authenticator_data.credential
        public_key_u2f = b'\x04' + credential.public_key.x + credential.public_key.y
        verification_data = b'\x00' + rp_id_hash + client_data_hash + credential.id + public_key_u2f
        assert len(credential.public_key.x) == 32
        assert len(credential.public_key.y) == 32
        self.cert_public_key.verify(self.signature, verification_data, ec.ECDSA(hashes.SHA256()))
        key_id = x509.SubjectKeyIdentifier.from_public_key(self.cert_public_key).digest.hex()
        att_root_cert_chain = self.metadata_for_key_id(key_id)["attestationRootCertificates"]

        # TODO: implement full cert chain validation
        # See https://cryptography.io/en/latest/x509/reference/#cryptography.x509.Certificate.tbs_certificate_bytes
        # See https://github.com/pyca/cryptography/issues/2381
        # See https://github.com/wbond/certvalidator
        assert len(att_root_cert_chain) == 1
        att_root_cert = x509.load_der_x509_certificate(att_root_cert_chain[0].encode(),
                                                       cryptography.hazmat.backends.default_backend())
        att_root_cert.public_key().verify(self.att_cert.signature,
                                          self.att_cert.tbs_certificate_bytes,
                                          padding.PKCS1v15(),
                                          self.att_cert.signature_hash_algorithm)
        return self.validated_attestation(type="Basic", trust_path="x5c", credential=credential) 
示例2
def request_cert(self, builder, **kwargs):
        """Send CSR and request certificate
        """
        signed = self._sign_csr(builder)
        csr_pem = signed.public_bytes(serialization.Encoding.PEM)
        if not isinstance(csr_pem, six.text_type):
            csr_pem = csr_pem.decode('ascii')

        response = self._cert_request(csr_pem, **kwargs)

        if self.plugin.chain:
            certs = tuple(
                x509.load_der_x509_certificate(cert, self.backend)
                for cert in response[u'result'][u'certificate_chain']
            )
        else:
            # certificate is just base64 without BEGIN/END certificate
            cert = base64.b64decode(response[u'result'][u'certificate'])
            certs = (x509.load_der_x509_certificate(cert, self.backend), )

        pem = [self._dump_privkey(self._privkey)]
        pem.extend(self._dump_cert(cert) for cert in certs)
        return response, '\n'.join(pem) 
示例3
def _process_pkcs7_substrate(substrate):
    contentInfo, _ = der_decoder.decode(substrate,
                                        asn1Spec=rfc2315.ContentInfo())

    contentType = contentInfo.getComponentByName('contentType')

    if contentType != rfc2315.signedData:
        raise Exception

    content, _ = der_decoder.decode(
        contentInfo.getComponentByName('content'),
        asn1Spec=rfc2315.SignedData())

    for blob in content.getComponentByName('certificates'):
        cert = x509.load_der_x509_certificate(der_encoder.encode(blob),
                                              backends.default_backend())
        print(cert.public_bytes(
            encoding=serialization.Encoding.PEM).decode(
            'unicode_escape'), end='')


# Main program code 
示例4
def test_xmldsig_interop_TR2012(self):
        def get_x509_cert(**kwargs):
            from cryptography.x509 import load_der_x509_certificate
            from OpenSSL.crypto import X509
            with open(os.path.join(interop_dir, "TR2012", "rsa-cert.der"), "rb") as fh:
                return [X509.from_cryptography(load_der_x509_certificate(fh.read(), backend=default_backend()))]

        signature_files = glob(os.path.join(interop_dir, "TR2012", "signature*.xml"))
        for signature_file in signature_files:
            print("Verifying", signature_file)
            with open(signature_file, "rb") as fh:
                try:
                    sig = fh.read()
                    XMLVerifier().verify(sig, require_x509=False, hmac_key="testkey", validate_schema=True,
                                         cert_resolver=get_x509_cert if "x509digest" in signature_file else None)
                    decoded_sig = sig.decode("utf-8")
                except Exception as e:
                    if "keyinforeference" in signature_file:
                        print("Unsupported test case:", type(e), e)
                    elif "x509digest" in signature_file:
                        assert isinstance(e, InvalidCertificate)
                    else:
                        raise 
示例5
def handle(self, pub, **options):
        pub_data = pub.read()

        try:  # close reader objects (otherwise we get a ResourceWarning)
            pub.close()
        except Exception:  # pragma: no cover
            pass

        # load public key
        try:
            pub_loaded = x509.load_pem_x509_certificate(pub_data, default_backend())
        except Exception:
            try:
                pub_loaded = x509.load_der_x509_certificate(pub_data, default_backend())
            except Exception:
                raise CommandError('Unable to load public key.')

        cert = Certificate(ca=options['ca'])
        cert.x509 = pub_loaded
        cert.save() 
示例6
def _load_pub(data):
    basedir = data.get('basedir', settings.FIXTURES_DIR)
    path = os.path.join(basedir, data['pub_filename'])

    with open(path, 'rb') as stream:
        pem = stream.read().replace(b'\r\n', b'\n')

    pub_data = {
        'pem': pem.decode('utf-8'),
        'parsed': x509.load_pem_x509_certificate(pem, default_backend()),
    }

    if data.get('pub_der_filename'):
        der_path = os.path.join(basedir, data['pub_der_filename'])
        with open(der_path, 'rb') as stream:
            der = stream.read().replace(b'\r\n', b'\n')
        pub_data['der'] = der
        # Failes for alt-extensions since alternative AKI was added
        #pub_data['der_parsed'] = x509.load_der_x509_certificate(der, default_backend()),

    return pub_data 
示例7
def test_BackuprKey_BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID(self):
        dce, rpctransport = self.connect()
        request = bkrp.BackuprKey()
        request['pguidActionAgent'] = bkrp.BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID
        request['pDataIn'] = NULL
        request['cbDataIn'] = 0
        request['dwParam'] = 0

        resp = dce.request(request)

        resp.dump()

        #print "LEN: %d" % len(''.join(resp['ppDataOut']))
        #hexdump(''.join(resp['ppDataOut']))

        cert = x509.load_der_x509_certificate(b''.join(resp['ppDataOut']), default_backend())

        print(cert.subject)
        print(cert.issuer)
        print(cert.signature) 
示例8
def test_hBackuprKey_BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID(self):
        dce, rpctransport = self.connect()
        request = bkrp.BackuprKey()
        request['pguidActionAgent'] = bkrp.BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID
        request['pDataIn'] = NULL
        request['cbDataIn'] = 0
        request['dwParam'] = 0

        resp = bkrp.hBackuprKey(dce, bkrp.BACKUPKEY_RETRIEVE_BACKUP_KEY_GUID, NULL)

        resp.dump()

        #print "LEN: %d" % len(''.join(resp['ppDataOut']))
        #hexdump(''.join(resp['ppDataOut']))

        cert = x509.load_der_x509_certificate(b''.join(resp['ppDataOut']), default_backend())

        print(cert.subject)
        print(cert.issuer)
        print(cert.signature) 
示例9
def __init__(self, data):
        """
        Cert constructor

        It can handle PEM and DER encoded strings and lists of int bytes.

        :param data: bytes or list of int
        """
        if type(data) == list:
            data = bytes(data)
        if type(data) != bytes:
            raise Exception("data must be bytes or list of int bytes")
        self.__raw_data = data
        if b"-----BEGIN CERTIFICATE-----" in data:
            self.x509 = x509.load_pem_x509_certificate(data, backends.default_backend())
            self.__raw_type = "PEM"
        else:
            self.x509 = x509.load_der_x509_certificate(data, backends.default_backend())
            self.__raw_type = "DER" 
示例10
def fqdns_from_certificate(cert_data):

    try:
        cert = x509.load_pem_x509_certificate(cert_data, default_backend())
    except ValueError:
        pass

    try:
        cert = x509.load_der_x509_certificate(cert_data, default_backend())
    except ValueError:
        raise ValueError("No recognized cert format. Allowed: PEM or DER")

    names = set()
    names.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value.lower().rstrip('.'))

    try:
        alt_names = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
    except x509.extensions.ExtensionNotFound:
        alt_names = None

    if alt_names:
        for alt_name in alt_names.value.get_values_for_type(x509.DNSName):
            names.add(alt_name.lower().rstrip('.'))

    return list(sorted(names)) 
示例11
def cert_get_names(cert_data):

    try:
        cert = x509.load_pem_x509_certificate(cert_data, default_backend())
    except ValueError:
        pass

    try:
        cert = x509.load_der_x509_certificate(cert_data, default_backend())
    except ValueError:
        raise ValueError("No recognized cert format. Allowed: PEM or DER")

    names = set()
    names.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value.lower())

    try:
        alt_names = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
    except x509.extensions.ExtensionNotFound:
        alt_names = None

    if alt_names:
        for alt_name in alt_names.value.get_values_for_type(x509.DNSName):
            names.add(alt_name.lower())

    return list(sorted(names)) 
示例12
def metadata_toc(self):
        if self._metadata_toc is None:
            res = requests.get(self.mds_url)
            res.raise_for_status()
            jwt_header = jwt.get_unverified_header(res.content)
            assert jwt_header["alg"] == "ES256"
            cert = x509.load_der_x509_certificate(jwt_header["x5c"][0].encode(),
                                                  cryptography.hazmat.backends.default_backend())
            self._metadata_toc = jwt.decode(res.content, key=cert.public_key(), algorithms=["ES256"])
        return self._metadata_toc 
示例13
def __init__(self, att_stmt):
        self.att_stmt = att_stmt
        assert len(self.att_stmt["x5c"]) == 1
        der_cert = att_stmt["x5c"][0]
        self.att_cert = x509.load_der_x509_certificate(der_cert, cryptography.hazmat.backends.default_backend())
        self.cert_public_key = self.att_cert.public_key()
        self.signature = att_stmt["sig"] 
示例14
def _get_normalized_payload(self, encoded_bytes, secret_type):
        """Normalizes the bytes of the object.

        Barbican expects certificates, public keys, and private keys in PEM
        format, but Castellan expects these objects to be DER encoded bytes
        instead.
        """
        if secret_type == 'public':
            key = serialization.load_der_public_key(
                encoded_bytes,
                backend=backends.default_backend())
            return key.public_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PublicFormat.SubjectPublicKeyInfo)
        elif secret_type == 'private':
            key = serialization.load_der_private_key(
                encoded_bytes,
                backend=backends.default_backend(),
                password=None)
            return key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption())
        elif secret_type == 'certificate':
            cert = cryptography_x509.load_der_x509_certificate(
                encoded_bytes,
                backend=backends.default_backend())
            return cert.public_bytes(encoding=serialization.Encoding.PEM)
        else:
            return encoded_bytes 
示例15
def scan(self, offset=0, maxlen=None):
        for hit in super(CertScanner, self).scan(offset=offset, maxlen=maxlen):
            signature = self.address_space.read(hit + 4, 3)
            size = self.profile.Object(
                "unsigned be short", offset=hit+2, vm=self.address_space)
            description = None

            if signature.startswith(b"\x30\x82"):
                data = self.address_space.read(hit, size + 4)
                if x509:
                    try:
                        cert = x509.load_der_x509_certificate(data, default_backend())
                        description = dict((
                            attr.oid._name, attr.value) for attr in cert.subject)
                    except Exception:
                        pass

                yield hit, "X509", data, description

            elif signature.startswith(b"\x02\x01\x00"):
                data = self.address_space.read(hit, size + 4)
                if x509:
                    try:
                        pem = (b"-----BEGIN RSA PRIVATE KEY-----\n" +
                               base64.b64encode(data) +
                               b"-----END RSA PRIVATE KEY-----")
                        key = serialization.load_pem_private_key(
                            pem, password=None, backend=default_backend())
                        description = ""
                    except Exception:
                        pass

                yield hit, "RSA", data, description 
示例16
def verify_hit(self, hit, address_space):
        signature = address_space.read(hit + 4, 3)
        size = self.profile.Object(
            "unsigned be short", offset=hit+2, vm=address_space)
        description = None

        if signature.startswith(b"\x30\x82"):
            data = address_space.read(hit, size + 4)
            if x509:
                try:
                    cert = x509.load_der_x509_certificate(data, default_backend())
                    description = dict((
                        attr.oid._name, attr.value) for attr in cert.subject)
                except Exception:
                    pass

            return "X509", data, description

        elif signature.startswith(b"\x02\x01\x00"):
            data = address_space.read(hit, size + 4)
            if x509:
                try:
                    pem = (b"-----BEGIN RSA PRIVATE KEY-----\n" +
                           base64.b64encode(data) +
                           b"-----END RSA PRIVATE KEY-----")
                    key = serialization.load_pem_private_key(
                        pem, password=None, backend=default_backend())
                    description = ""
                except Exception:
                    pass

            return "RSA", data, description

        return None, None, None 
示例17
def _get_x509_from_der_bytes(certificate_der):
    """Parse X509 data from a DER encoded certificate

    :param certificate_der: Certificate in DER format
    :returns: crypto high-level x509 data from the DER-encoded certificate
    """
    try:
        x509cert = x509.load_der_x509_certificate(certificate_der,
                                                  backends.default_backend())
    except Exception:
        LOG.exception('Unreadable Certificate.')
        raise exceptions.UnreadableCert
    return x509cert 
示例18
def _get_x509_from_der_bytes(certificate_der):
    """Parse X509 data from a DER encoded certificate

    :param certificate_der: Certificate in DER format
    :returns: crypto high-level x509 data from the DER-encoded certificate
    """
    try:
        x509cert = x509.load_der_x509_certificate(certificate_der,
                                                  backends.default_backend())
    except Exception:
        LOG.exception('Unreadable Certificate.')
        raise f5_ex.UnreadableCert
    return x509cert 
示例19
def load_der_certificate(data):
    """
    Loads a DER X.509 certificate.
    """
    return x509.load_der_x509_certificate(data, default_backend()) 
示例20
def _get_certificate_hash(certificate_der):
        """
        Get's the server's certificate hash for the tls-server-end-point
        channel binding.

        According to https://tools.ietf.org/html/rfc5929#section-4.1, this is
        calculated by
            Using the SHA256 is the signatureAlgorithm is MD5 or SHA1
            The signatureAlgorithm if the hash function is neither MD5 or SHA1

        :param certificate_der: The byte string of the server's certificate
        :return: The byte string containing the hash of the server's
            certificate
        """
        backend = default_backend()

        cert = x509.load_der_x509_certificate(certificate_der, backend)

        try:
            hash_algorithm = cert.signature_hash_algorithm
        except UnsupportedAlgorithm as ex:
            warning = "Failed to get the signature algorithm from the " \
                      "certificate, unable to pass channel bindings data: %s" \
                      % str(ex)
            warnings.warn(warning, UnknownSignatureAlgorithmOID)
            return None

        # if the cert signature algorithm is either md5 or sha1 then use sha256
        # otherwise use the signature algorithm of the cert itself
        if hash_algorithm.name in ['md5', 'sha1']:
            digest = hashes.Hash(hashes.SHA256(), backend)
        else:
            digest = hashes.Hash(hash_algorithm, backend)

        digest.update(certificate_der)
        certificate_hash = digest.finalize()

        return certificate_hash 
示例21
def get_certificate_from_connection(connection):
    """
    Extract an X.509 certificate from a socket connection.
    """
    certificate = connection.getpeercert(binary_form=True)
    if certificate:
        return x509.load_der_x509_certificate(
            certificate,
            backends.default_backend()
        )
    return None 
示例22
def get_signer_certificate(content, signer):
    certificates = content["certificates"]
    signer_id = signer["sid"]
    for certificate in certificates:
        if certificate.chosen.serial_number == signer_id.chosen["serial_number"].native and \
           certificate.chosen.issuer == signer_id.chosen["issuer"]:
            certificate_bytes = certificate.dump()
            certificate = x509.load_der_x509_certificate(certificate_bytes, default_backend())
            certificate_i_cn = ", ".join(
                o.value
                for o in certificate.issuer.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
            )
            return certificate_i_cn, certificate_bytes, certificate 
示例23
def ack_certificate_list(request: DBCommand, device: Device, response: dict):
    """Acknowledge a response to the ``CertificateList`` command.

    Args:
        request (Command): An instance of the command that prompted the device to come back with this request.
        device (Device): The database model of the device responding.
        response (dict): The response plist data, as a dictionary.

    Returns:
        void: Nothing is returned but this behaviour is subject to change.
    """
    for c in device.installed_certificates:
        db.session.delete(c)

    certificates = response['CertificateList']
    current_app.logger.debug(
        'Received CertificatesList response containing {} certificate(s)'.format(len(certificates)))

    for cert in certificates:
        ic = InstalledCertificate()
        ic.device = device
        ic.device_udid = device.udid

        ic.x509_cn = cert.get('CommonName', None)
        ic.is_identity = cert.get('IsIdentity', None)

        der_data = cert['Data']
        certificate = x509.load_der_x509_certificate(der_data, default_backend())
        ic.fingerprint_sha256 = hexlify(certificate.fingerprint(hashes.SHA256()))
        ic.der_data = der_data

        db.session.add(ic)

    db.session.commit() 
示例24
def from_der(der_data: bytes) -> x509.Certificate:
    return x509.load_der_x509_certificate(der_data, default_backend()) 
示例25
def create_ssl_context(cert_byes, pk_bytes, password=None,
                       encoding=Encoding.PEM):
    """Create an SSL Context with the supplied cert/password.

    :param cert_bytes array of bytes containing the cert encoded
           using the method supplied in the ``encoding`` parameter
    :param pk_bytes array of bytes containing the private key encoded
           using the method supplied in the ``encoding`` parameter
    :param password array of bytes containing the passphrase to be used
           with the supplied private key. None if unencrypted.
           Defaults to None.
    :param encoding ``cryptography.hazmat.primitives.serialization.Encoding``
            details the encoding method used on the ``cert_bytes``  and
            ``pk_bytes`` parameters. Can be either PEM or DER.
            Defaults to PEM.
    """
    backend = default_backend()

    cert = None
    key = None
    if encoding == Encoding.PEM:
        cert = x509.load_pem_x509_certificate(cert_byes, backend)
        key = load_pem_private_key(pk_bytes, password, backend)
    elif encoding == Encoding.DER:
        cert = x509.load_der_x509_certificate(cert_byes, backend)
        key = load_der_private_key(pk_bytes, password, backend)
    else:
        raise ValueError('Invalid encoding provided: Must be PEM or DER')

    if not (cert and key):
        raise ValueError('Cert and key could not be parsed from '
                         'provided data')
    check_cert_dates(cert)
    ssl_context = PyOpenSSLContext(PROTOCOL)
    ssl_context._ctx.use_certificate(X509.from_cryptography(cert))
    ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key))
    return ssl_context 
示例26
def get_key_from_jwk(self, jwk, alg):
        if "x5c" in jwk:
            return load_der_x509_certificate(
                base64.b64decode(jwk["x5c"][0]), default_backend()
            ).public_key()

        algorithm = self._algorithms[alg]

        # Awkward:
        return algorithm.from_jwk(json.dumps(jwk)) 
示例27
def load_certificate(path):
    _, ext = os.path.splitext(path)
    with open(path, "rb") as f:
        if ext == ".pem":
            return x509.load_pem_x509_certificate(f.read(), default_backend())
        else:
            return x509.load_der_x509_certificate(f.read(), default_backend()) 
示例28
def x509_from_der(data):
    if not data:
        return None
    return x509.load_der_x509_certificate(data, default_backend()) 
示例29
def _load_keys(self, certificates):
        new_keys = []
        for cert in certificates:
            logger.debug("Loading public key from certificate: %s", cert)
            cert_obj = load_der_x509_certificate(base64.b64decode(cert), backend)
            new_keys.append(cert_obj.public_key())
        self.signing_keys = new_keys 
示例30
def verify_sig(self, encoded_cert):
        cert = x509.load_der_x509_certificate(encoded_cert, default_backend())

        crypto.verify(
            self.ca.cert,
            cert.signature,
            cert.tbs_certificate_bytes,
            'sha256')