Python源码示例:cryptography.x509.load_pem_x509_certificate()

示例1
def setup_method(self, method):
        super(TestCustodiaIPACertRequests, self).setup_method(method)
        cert = x509.load_pem_x509_certificate(CERT_PEM, default_backend())
        cert_der = cert.public_bytes(serialization.Encoding.DER)
        cert_stripped = base64.b64encode(cert_der)
        ca = x509.load_pem_x509_certificate(CA_PEM, default_backend())
        ca_der = ca.public_bytes(serialization.Encoding.DER)
        self.m_api.Command.cert_request.return_value = {
            u'result': {
                u'subject': 'dummy subject',
                u'request_id': 1,
                u'serial_number': 1,
                u'certificate': cert_stripped,
                u'certificate_chain': (
                    cert_der,
                    ca_der,
                )
            }
        } 
示例2
def create(cls, client, password, cert_data):
        """Create a new certificate."""
        cert = x509.load_pem_x509_certificate(cert_data, default_backend())
        base64_cert = cert.public_bytes(Encoding.PEM).decode('utf-8')
        # STRIP OUT CERT META "-----BEGIN CERTIFICATE-----"
        base64_cert = '\n'.join(base64_cert.split('\n')[1:-2])
        data = {
            'type': 'client',
            'certificate': base64_cert,
            'password': password,
        }
        client.api.certificates.post(json=data)

        # XXX: rockstar (08 Jun 2016) - Please see the open lxd bug here:
        # https://github.com/lxc/lxd/issues/2092
        fingerprint = binascii.hexlify(
            cert.fingerprint(hashes.SHA256())).decode('utf-8')
        return cls.get(client, fingerprint) 
示例3
def create(vek, keySizeBytes, certificatePath):
        #print("VEK: " + str(binascii.hexlify(vek)))
        publicKeyPem = open(certificatePath).read()
        publicKey = RSA.importKey(publicKeyPem)
        # Convert from PEM to DER

        lines = publicKeyPem.replace(" ", '').split()
        publicKeyDer = binascii.a2b_base64(''.join(lines[1:-1]))

        cert = x509.load_pem_x509_certificate(SmartStr(publicKeyPem), default_backend())
        subjectName = cert.subject.rfc4514_string()
        serial = cert.serial_number

        cipher = PKCS1_OAEP.new(key=publicKey, hashAlgo=SHA256, mgfunc=lambda x, y: pss.MGF1(x, y, SHA1))
        wrapped_key = cipher.encrypt(vek)
        #print("WrappedKey: " + str(binascii.hexlify(wrapped_key)))

        return CertEncryptedKeyBag(subjectName, serial, keySizeBytes, wrapped_key) 
示例4
def validate_ca_cert(self, ignored):
        expected = self._get_expected_ca_cert_fingerprint()
        algo, expectedfp = expected.split(':')
        expectedfp = expectedfp.replace(' ', '')
        backend = default_backend()

        with open(self._get_ca_cert_path(), 'r') as f:
            certstr = f.read()
        cert = load_pem_x509_certificate(certstr, backend)
        hasher = getattr(hashes, algo)()
        fpbytes = cert.fingerprint(hasher)
        fp = binascii.hexlify(fpbytes)

        if fp != expectedfp:
            os.unlink(self._get_ca_cert_path())
            self.log.error("Fingerprint of CA cert doesn't match: %s <-> %s"
                           % (fp, expectedfp))
            raise NetworkError("The provider's CA fingerprint doesn't match") 
示例5
def test_generate_cert_key_pair(self):
        cn = 'testCN'
        bit_length = 512

        # Attempt to generate a cert/key pair
        cert_object = self.cert_generator.generate_cert_key_pair(
            cn=cn,
            validity=2 * 365 * 24 * 60 * 60,
            bit_length=bit_length,
            passphrase=self.ca_private_key_passphrase,
            ca_cert=self.ca_certificate,
            ca_key=self.ca_private_key,
            ca_key_pass=self.ca_private_key_passphrase
        )

        # Validate that the cert and key are loadable
        cert = x509.load_pem_x509_certificate(
            data=cert_object.certificate, backend=backends.default_backend())
        self.assertIsNotNone(cert)

        key = serialization.load_pem_private_key(
            data=cert_object.private_key,
            password=cert_object.private_key_passphrase,
            backend=backends.default_backend())
        self.assertIsNotNone(key) 
示例6
def pem_certificate_upload(f):
    """Parse PEM formatted certificate in request data
    
    TODO: form field name option
    """

    @wraps(f)
    def decorator(*args, **kwargs):
        try:
            certificate_data = request.files['file'].read()
            g.certificate = x509.load_pem_x509_certificate(certificate_data, backend=default_backend())
        except UnsupportedAlgorithm as e:
            current_app.logger.info('could not parse PEM certificate data')
            abort(400, 'invalid input data')

        return f(*args, **kwargs)

    return decorator 
示例7
def anchor_certs():
    """Download a list of certificates to trust the MDM

    The response is a JSON array of base64 encoded DER certs as described in the DEP profile creation documentation."""
    anchors = []

    if 'CA_CERTIFICATE' in current_app.config:
        with open(current_app.config['CA_CERTIFICATE'], 'rb') as fd:
            pem_data = fd.read()
            c: x509.Certificate = x509.load_pem_x509_certificate(pem_data, backend=default_backend())
            der = c.public_bytes(Encoding.DER)
            anchors.append(urlsafe_b64encode(der))

    if 'SSL_CERTIFICATE' in current_app.config:
        with open(current_app.config['SSL_CERTIFICATE'], 'rb') as fd:
            pem_data = fd.read()
            c: x509.Certificate = x509.load_pem_x509_certificate(pem_data, backend=default_backend())
            der = c.public_bytes(Encoding.DER)
            anchors.append(urlsafe_b64encode(der))

    return jsonify(anchors) 
示例8
def add_valid_from(apps, schema_editor):
    Certificate = apps.get_model('django_ca', 'Certificate')
    for cert in Certificate.objects.all():
        backend = default_backend()
        pem = x509.load_pem_x509_certificate(force_bytes(cert.pub), backend)
        valid_from = pem.not_valid_before

        if settings.USE_TZ:
            valid_from = timezone.make_aware(valid_from)

        cert.valid_from = valid_from
        cert.save()

    CertificateAuthority = apps.get_model('django_ca', 'CertificateAuthority')
    for cert in CertificateAuthority.objects.all():
        backend = default_backend()
        pem = x509.load_pem_x509_certificate(force_bytes(cert.pub), backend)
        valid_from = pem.not_valid_before

        if settings.USE_TZ:
            valid_from = timezone.make_aware(valid_from)

        cert.valid_from = valid_from
        cert.save() 
示例9
def handle(self, pub, **options):
        pub_data = pub.read()

        try:  # close reader objects (otherwise we get a ResourceWarning)
            pub.close()
        except Exception:  # pragma: no cover
            pass

        # load public key
        try:
            pub_loaded = x509.load_pem_x509_certificate(pub_data, default_backend())
        except Exception:
            try:
                pub_loaded = x509.load_der_x509_certificate(pub_data, default_backend())
            except Exception:
                raise CommandError('Unable to load public key.')

        cert = Certificate(ca=options['ca'])
        cert.x509 = pub_loaded
        cert.save() 
示例10
def _load_pub(data):
    basedir = data.get('basedir', settings.FIXTURES_DIR)
    path = os.path.join(basedir, data['pub_filename'])

    with open(path, 'rb') as stream:
        pem = stream.read().replace(b'\r\n', b'\n')

    pub_data = {
        'pem': pem.decode('utf-8'),
        'parsed': x509.load_pem_x509_certificate(pem, default_backend()),
    }

    if data.get('pub_der_filename'):
        der_path = os.path.join(basedir, data['pub_der_filename'])
        with open(der_path, 'rb') as stream:
            der = stream.read().replace(b'\r\n', b'\n')
        pub_data['der'] = der
        # Failes for alt-extensions since alternative AKI was added
        #pub_data['der_parsed'] = x509.load_der_x509_certificate(der, default_backend()),

    return pub_data 
示例11
def _get_public_tls_parameters(service_certificate_path):
    with open(service_certificate_path, "rb") as pem_file:
        pem_data = pem_file.read()
        cert = x509.load_pem_x509_certificate(pem_data, default_backend())
        private_key = serialization.load_pem_private_key(
            pem_data,
            password=None,
            backend=default_backend())

        key_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption())
        cert_pem = cert.public_bytes(serialization.Encoding.PEM)
        return {
            'SSLCertificate': cert_pem,
            'SSLKey': key_pem
        } 
示例12
def _get_public_tls_parameters(service_certificate_path):
    with open(service_certificate_path, "rb") as pem_file:
        pem_data = pem_file.read()
        cert = x509.load_pem_x509_certificate(pem_data, default_backend())
        private_key = serialization.load_pem_private_key(
            pem_data,
            password=None,
            backend=default_backend())

        key_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption())
        cert_pem = cert.public_bytes(serialization.Encoding.PEM)
        return {
            'SSLCertificate': cert_pem,
            'SSLKey': key_pem
        } 
示例13
def _scan_a_cert(id, cert_path, key_path, assigns, is_acme=False):
    with open(cert_path, "rb") as f:
        crt = x509.load_pem_x509_certificate(f.read(), default_backend())
    with open(key_path, "rb") as f:
        key = serialization.load_pem_private_key(
            f.read(),
            password=None,
            backend=default_backend()
        )
    sha1 = binascii.hexlify(crt.fingerprint(hashes.SHA1())).decode()
    md5 = binascii.hexlify(crt.fingerprint(hashes.MD5())).decode()
    sha1 = ":".join([sha1[i:i+2].upper() for i in range(0, len(sha1), 2)])
    md5 = ":".join([md5[i:i+2].upper() for i in range(0, len(md5), 2)])
    kt = "RSA" if isinstance(key.public_key(), rsa.RSAPublicKey) else "DSA"
    common_name = crt.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
    return Certificate(
        id=id, cert_path=cert_path, key_path=key_path, keytype=kt,
        keylength=key.key_size, domain=common_name[0].value,
        assigns=assigns.get(id, []), expiry=crt.not_valid_after, sha1=sha1,
        md5=md5, is_acme=is_acme) 
示例14
def get_certificate(self, kid):
        # retrieve keys from jwks_url
        resp = self.request(self.jwks_url(), method='GET')
        resp.raise_for_status()

        # find the proper key for the kid
        for key in resp.json()['keys']:
            if key['kid'] == kid:
                x5c = key['x5c'][0]
                break
        else:
            raise DecodeError('Cannot find kid={}'.format(kid))

        certificate = '-----BEGIN CERTIFICATE-----\n' \
                      '{}\n' \
                      '-----END CERTIFICATE-----'.format(x5c)

        return load_pem_x509_certificate(certificate.encode(),
                                         default_backend()) 
示例15
def __init__(self, data):
        """
        Cert constructor

        It can handle PEM and DER encoded strings and lists of int bytes.

        :param data: bytes or list of int
        """
        if type(data) == list:
            data = bytes(data)
        if type(data) != bytes:
            raise Exception("data must be bytes or list of int bytes")
        self.__raw_data = data
        if b"-----BEGIN CERTIFICATE-----" in data:
            self.x509 = x509.load_pem_x509_certificate(data, backends.default_backend())
            self.__raw_type = "PEM"
        else:
            self.x509 = x509.load_der_x509_certificate(data, backends.default_backend())
            self.__raw_type = "DER" 
示例16
def fqdns_from_certificate(cert_data):

    try:
        cert = x509.load_pem_x509_certificate(cert_data, default_backend())
    except ValueError:
        pass

    try:
        cert = x509.load_der_x509_certificate(cert_data, default_backend())
    except ValueError:
        raise ValueError("No recognized cert format. Allowed: PEM or DER")

    names = set()
    names.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value.lower().rstrip('.'))

    try:
        alt_names = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
    except x509.extensions.ExtensionNotFound:
        alt_names = None

    if alt_names:
        for alt_name in alt_names.value.get_values_for_type(x509.DNSName):
            names.add(alt_name.lower().rstrip('.'))

    return list(sorted(names)) 
示例17
def cert_get_names(cert_data):

    try:
        cert = x509.load_pem_x509_certificate(cert_data, default_backend())
    except ValueError:
        pass

    try:
        cert = x509.load_der_x509_certificate(cert_data, default_backend())
    except ValueError:
        raise ValueError("No recognized cert format. Allowed: PEM or DER")

    names = set()
    names.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value.lower())

    try:
        alt_names = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
    except x509.extensions.ExtensionNotFound:
        alt_names = None

    if alt_names:
        for alt_name in alt_names.value.get_values_for_type(x509.DNSName):
            names.add(alt_name.lower())

    return list(sorted(names)) 
示例18
def _get_rsa_parameters(server_info: ServerConnectivityInfo, openssl_cipher_string: str) -> Optional[RSAPublicNumbers]:
    ssl_connection = server_info.get_preconfigured_tls_connection()
    ssl_connection.ssl_client.set_cipher_list(openssl_cipher_string)
    parsed_cert = None
    try:
        # Perform the SSL handshake
        ssl_connection.connect()
        cert_as_pem = ssl_connection.ssl_client.get_received_chain()[0]
        parsed_cert = load_pem_x509_certificate(cert_as_pem.encode("ascii"), backend=default_backend())
    except ServerRejectedTlsHandshake:
        # Server does not support RSA cipher suites?
        pass
    except ClientCertificateRequested:
        # AD: The server asked for a client cert. We could still retrieve the server certificate, but it is unclear
        # to me if the ROBOT check is supposed to work even if we do not provide a client cert. My guess is that
        # it should not work since it requires completing a full handshake, which we can't without a client cert.
        # Hence, propagate the error to make the check fail.
        raise
    finally:
        ssl_connection.close()

    if parsed_cert:
        public_key = parsed_cert.public_key()
        if isinstance(public_key, RSAPublicKey):
            return public_key.public_numbers()
        else:
            return None
    else:
        return None 
示例19
def _monkeypatch_to_fix_certificate_asdict() -> None:
    # H4ck: monkeypatch the _Certificate class to add __deepcopy__() so that when we call asdict() on a dataclass
    # that contains a _Certificate, asdict() succeeds. Without this, generating JSON for the certinfo scan command
    # will crash because the asdict() function uses deepcopy(), but certificates returned by cryptography.x509
    # don't support it so SSLyze would crash. This class is a workaround to fix JSON output.
    # I opened an issue about it in the cryptography repo at https://github.com/pyca/cryptography/issues/5129
    def _deepcopy_method_for_x509_certificate(inner_self: _Certificate, memo: str) -> x509.Certificate:
        return x509.load_pem_x509_certificate(inner_self.public_bytes(Encoding.PEM), backend=default_backend())

    _Certificate.__deepcopy__ = _deepcopy_method_for_x509_certificate


# Call it on import... hacky but we don't have a choice 
示例20
def _convert_string_output(self, line):
        """
        Converts command output to instance of X.509 Certificate Object.
        """
        if self._regex_helper.search_compiled(OpensslX509TextIn._re_end_certificate, line):
            self.current_ret = load_pem_x509_certificate(data=self._string_output.encode(), backend=default_backend())
            raise ParsingDone

    # unable to load certificate 
示例21
def _get_secret_data(self, secret):
        """Retrieves the secret data.

        Converts the Barbican secret to bytes suitable for a Castellan object.
        If the secret is a public key, private key, or certificate, the secret
        is expected to be in PEM format and will be converted to DER.

        :param secret: the secret from barbican with the payload of data
        :returns: the secret data
        """
        if secret.secret_type == 'public':
            key = serialization.load_pem_public_key(
                secret.payload,
                backend=backends.default_backend())
            return key.public_bytes(
                encoding=serialization.Encoding.DER,
                format=serialization.PublicFormat.SubjectPublicKeyInfo)
        elif secret.secret_type == 'private':
            key = serialization.load_pem_private_key(
                secret.payload,
                backend=backends.default_backend(),
                password=None)
            return key.private_bytes(
                encoding=serialization.Encoding.DER,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption())
        elif secret.secret_type == 'certificate':
            cert = cryptography_x509.load_pem_x509_certificate(
                secret.payload,
                backend=backends.default_backend())
            return cert.public_bytes(encoding=serialization.Encoding.DER)
        else:
            return secret.payload 
示例22
def import_from_pem(self, data, password=None):
        """Imports a key from data loaded from a PEM file.
        The key may be encrypted with a password.
        Private keys (PKCS#8 format), public keys, and X509 certificate's
        public keys can be imported with this interface.

        :param data(bytes): The data contained in a PEM file.
        :param password(bytes): An optional password to unwrap the key.
        """

        try:
            key = serialization.load_pem_private_key(
                data, password=password, backend=default_backend())
        except ValueError as e:
            if password is not None:
                raise e
            try:
                key = serialization.load_pem_public_key(
                    data, backend=default_backend())
            except ValueError:
                try:
                    cert = x509.load_pem_x509_certificate(
                        data, backend=default_backend())
                    key = cert.public_key()
                except ValueError:
                    raise e

        self.import_from_pyca(key)
        self._params['kid'] = self.thumbprint() 
示例23
def certificate_needs_renewed(cert_dir):
    with open(os.path.join(cert_dir, 'cert.pem'), 'rb') as cert_file:
        pem_data = cert_file.read()
    cert = x509.load_pem_x509_certificate(pem_data, default_backend())
    not_valid_before = cert.not_valid_before
    not_valid_after = cert.not_valid_after
    return not_valid_after - datetime.datetime.utcnow() < (cert.not_valid_after - cert.not_valid_before)/2 
示例24
def read_cert(cert_dir):
    with open(os.path.join(cert_dir, 'cert.pem'), 'rb') as cert_file:
        pem_data = cert_file.read()
    cert = x509.load_pem_x509_certificate(pem_data, default_backend())
    common_name = [na.value for na in cert.subject if na.oid._dotted_string == "2.5.4.3"][0]
    subject_alternative_names = [ext.value for ext in cert.extensions if ext.oid._dotted_string == "2.5.29.17"][0]
    dns_names = subject_alternative_names.get_values_for_type(x509.DNSName)
    ip_sans = [str(ip) for ip in subject_alternative_names.get_values_for_type(x509.IPAddress)]

    return common_name, dns_names, ip_sans 
示例25
def validate_cert_name(cert_file, robot_name):
    """Validate the name on Vector's certificate against the user-provided name"""
    with open(cert_file, "rb") as f:
        cert_file = f.read()
        cert = x509.load_pem_x509_certificate(cert_file, default_backend())
        for fields in cert.subject:
            current = str(fields.oid)
            if "commonName" in current:
                common_name = fields.value
                if common_name != robot_name:
                    print(colored(" ERROR", "red"))
                    sys.exit("The name of the certificate ({}) does not match the name provided ({}).\n"
                             "Please verify the name, and try again.".format(common_name, robot_name))
                else:
                    return 
示例26
def test_sign_cert(self):
        # Attempt sign a cert
        signed_cert = self.cert_generator.sign_cert(
            csr=self.certificate_signing_request,
            validity=2 * 365 * 24 * 60 * 60,
            ca_cert=self.ca_certificate,
            ca_key=self.ca_private_key,
            ca_key_pass=self.ca_private_key_passphrase,
            ca_digest=self.signing_digest
        )

        self.assertIn("-----BEGIN CERTIFICATE-----",
                      signed_cert.decode('ascii'))

        # Load the cert for specific tests
        cert = x509.load_pem_x509_certificate(
            data=signed_cert, backend=backends.default_backend())

        # Make sure expiry time is accurate
        should_expire = (datetime.datetime.utcnow() +
                         datetime.timedelta(seconds=2 * 365 * 24 * 60 * 60))
        diff = should_expire - cert.not_valid_after
        self.assertLess(diff, datetime.timedelta(seconds=10))

        # Make sure this is a version 3 X509.
        self.assertEqual('v3', cert.version.name)

        # Make sure this cert is marked as Server and Client Cert via the
        # extended Key Usage extension
        self.assertIn(x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
                      cert.extensions.get_extension_for_class(
                          x509.ExtendedKeyUsage).value._usages)
        self.assertIn(x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH,
                      cert.extensions.get_extension_for_class(
                          x509.ExtendedKeyUsage).value._usages)

        # Make sure this cert can't sign other certs
        self.assertFalse(cert.extensions.get_extension_for_class(
            x509.BasicConstraints).value.ca) 
示例27
def get_host_names(certificate):
    """Extract the host names from the Pem encoded X509 certificate

    :param certificate: A PEM encoded certificate
    :returns: A dictionary containing the following keys:
              ['cn', 'dns_names']
              where 'cn' is the CN from the SubjectName of the
              certificate, and 'dns_names' is a list of dNSNames
              (possibly empty) from the SubjectAltNames of the certificate.
    """
    if isinstance(certificate, str):
        certificate = certificate.encode('utf-8')
    try:
        cert = x509.load_pem_x509_certificate(certificate,
                                              backends.default_backend())
        cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0]
        host_names = {
            'cn': cn.value.lower(),
            'dns_names': []
        }
        try:
            ext = cert.extensions.get_extension_for_oid(
                x509.OID_SUBJECT_ALTERNATIVE_NAME
            )
            host_names['dns_names'] = ext.value.get_values_for_type(
                x509.DNSName)
        except x509.ExtensionNotFound:
            LOG.debug("%s extension not found",
                      x509.OID_SUBJECT_ALTERNATIVE_NAME)

        return host_names
    except Exception:
        LOG.exception('Unreadable Certificate.')
        raise exceptions.UnreadableCert 
示例28
def _get_x509_from_pem_bytes(certificate_pem):
    """Parse X509 data from a PEM encoded certificate

    :param certificate_pem: Certificate in PEM format
    :returns: crypto high-level x509 data from the PEM string
    """
    if isinstance(certificate_pem, str):
        certificate_pem = certificate_pem.encode('utf-8')
    try:
        x509cert = x509.load_pem_x509_certificate(certificate_pem,
                                                  backends.default_backend())
    except Exception:
        LOG.exception('Unreadable Certificate.')
        raise exceptions.UnreadableCert
    return x509cert 
示例29
def deserialize(self, data):
        cert = x509.load_pem_x509_certificate(data, default_backend())
        key_material = cert.public_key().public_numbers()
        self._e = key_material.e
        self._n = key_material.n 
示例30
def _get_x509_from_pem_bytes(certificate_pem):
    """Parse X509 data from a PEM encoded certificate

    :param certificate_pem: Certificate in PEM format
    :returns: crypto high-level x509 data from the PEM string
    """
    if type(certificate_pem) == six.text_type:
        certificate_pem = certificate_pem.encode('utf-8')
    try:
        x509cert = x509.load_pem_x509_certificate(certificate_pem,
                                                  backends.default_backend())
    except Exception:
        LOG.exception('Unreadable Certificate.')
        raise f5_ex.UnreadableCert
    return x509cert