Python源码示例:cryptography.x509.load_pem_x509_csr()

示例1
def test_generate_csr(self):
        cn = 'testCN'
        # Attempt to generate a CSR
        csr = self.cert_generator._generate_csr(
            cn=cn,
            private_key=self.ca_private_key,
            passphrase=self.ca_private_key_passphrase
        )

        # Attempt to load the generated CSR
        csro = x509.load_pem_x509_csr(data=csr,
                                      backend=backends.default_backend())

        # Make sure the CN is correct
        self.assertEqual(cn, csro.subject.get_attributes_for_oid(
            x509.oid.NameOID.COMMON_NAME)[0].value) 
示例2
def get_sans_from_csr(data):
    """
    Fetches SubjectAlternativeNames from CSR.
    Works with any kind of SubjectAlternativeName
    :param data: PEM-encoded string with CSR
    :return: List of LemurAPI-compatible subAltNames
    """
    sub_alt_names = []
    try:
        request = x509.load_pem_x509_csr(data.encode("utf-8"), default_backend())
    except Exception:
        raise ValidationError("CSR presented is not valid.")

    try:
        alt_names = request.extensions.get_extension_for_class(
            x509.SubjectAlternativeName
        )
        for alt_name in alt_names.value:
            sub_alt_names.append(
                {"nameType": type(alt_name).__name__, "value": alt_name.value}
            )
    except x509.ExtensionNotFound:
        pass

    return sub_alt_names 
示例3
def test_csr_empty_san(client):
    """Test that an empty "names" list does not produce a CSR with empty SubjectAltNames extension.

    The Lemur UI always submits this extension even when no alt names are defined.
    """

    csr_text, pkey = create_csr(
        common_name="daniel-san.example.com",
        owner="daniel-san@example.com",
        key_type="RSA2048",
        extensions={"sub_alt_names": {"names": x509.SubjectAlternativeName([])}},
    )

    csr = x509.load_pem_x509_csr(csr_text.encode("utf-8"), default_backend())

    with pytest.raises(x509.ExtensionNotFound):
        csr.extensions.get_extension_for_class(x509.SubjectAlternativeName) 
示例4
def csr_details_view(self, request):
        """Returns details of a CSR request."""

        if not request.user.is_staff or not self.has_change_permission(request):
            # NOTE: is_staff is already assured by ModelAdmin, but just to be sure
            raise PermissionDenied

        try:
            csr = x509.load_pem_x509_csr(force_bytes(request.POST['csr']), default_backend())
        except Exception as e:
            return HttpResponseBadRequest(json.dumps({
                'message': str(e),
            }), content_type='application/json')

        subject = {OID_NAME_MAPPINGS[s.oid]: s.value for s in csr.subject}
        return HttpResponse(json.dumps({
            'subject': subject,
        }), content_type='application/json') 
示例5
def load_csr(data):
    """
    Loads a PEM X.509 CSR.
    """
    return x509.load_pem_x509_csr(data, default_backend()) 
示例6
def test_create_basic_csr(client):
    csr_config = dict(
        common_name="example.com",
        organization="Example, Inc.",
        organizational_unit="Operations",
        country="US",
        state="CA",
        location="A place",
        owner="joe@example.com",
        key_type="RSA2048",
        extensions=dict(
            names=dict(
                sub_alt_names=x509.SubjectAlternativeName(
                    [
                        x509.DNSName("test.example.com"),
                        x509.DNSName("test2.example.com"),
                    ]
                )
            )
        ),
    )
    csr, pem = create_csr(**csr_config)

    csr = x509.load_pem_x509_csr(csr.encode("utf-8"), default_backend())
    for name in csr.subject:
        assert name.value in csr_config.values() 
示例7
def parse_csr(csr):
    """
    Helper function that parses a CSR.

    :param csr:
    :return:
    """
    assert isinstance(csr, str)

    return x509.load_pem_x509_csr(csr.encode("utf-8"), default_backend()) 
示例8
def csr(data):
    """
    Determines if the CSR is valid and allowed.
    :param data:
    :return:
    """
    try:
        request = x509.load_pem_x509_csr(data.encode("utf-8"), default_backend())
    except Exception:
        raise ValidationError("CSR presented is not valid.")

    # Validate common name and SubjectAltNames
    try:
        for name in request.subject.get_attributes_for_oid(NameOID.COMMON_NAME):
            common_name(name.value)
    except ValueError as err:
        current_app.logger.info("Error parsing Subject from CSR: %s", err)
        raise ValidationError("Invalid Subject value in supplied CSR")

    try:
        alt_names = request.extensions.get_extension_for_class(
            x509.SubjectAlternativeName
        )

        for name in alt_names.value.get_values_for_type(x509.DNSName):
            sensitive_domain(name)
    except x509.ExtensionNotFound:
        pass 
示例9
def parse_csr(self, csr, csr_format):
        if isinstance(csr, x509.CertificateSigningRequest):
            return csr
        elif csr_format == Encoding.PEM:
            return x509.load_pem_x509_csr(force_bytes(csr), default_backend())
        elif csr_format == Encoding.DER:
            return x509.load_der_x509_csr(force_bytes(csr), default_backend())

        raise ValueError('Unknown CSR format passed: %s' % csr_format) 
示例10
def _load_csr(data):
    basedir = data.get('basedir', settings.FIXTURES_DIR)
    path = os.path.join(basedir, data['csr_filename'])

    with open(path, 'rb') as stream:
        raw = stream.read().strip()

    csr_data = {
        'pem': raw.decode('utf-8'),
        'parsed': x509.load_pem_x509_csr(raw, default_backend()),
    }

    csr_data['der'] = csr_data['parsed'].public_bytes(Encoding.DER)

    return csr_data 
示例11
def decode_csr(self, pem_csr):
        """
        Return a csr object from the pem encoded csr.
        """
        pem_csr = pem_csr.encode(encoding='UTF-8')
        return x509.load_pem_x509_csr(pem_csr, default_backend()) 
示例12
def load_csr(path):
    with open(path, 'rb') as f:
        pem_data = f.read()
    return x509.load_pem_x509_csr(pem_data, default_backend()) 
示例13
def test_convert_from_cryptography(self):
        crypto_req = x509.load_pem_x509_csr(
            cleartextCertificateRequestPEM, backend
        )
        req = X509Req.from_cryptography(crypto_req)
        assert isinstance(req, X509Req) 
示例14
def sign(csr, issuer_name, ca_key, ca_key_password=None,
         skip_validation=False):
    """Sign a given csr

    :param csr: certificate signing request object or pem encoded csr
    :param issuer_name: issuer name
    :param ca_key: private key of CA
    :param ca_key_password: private key password for given ca key
    :param skip_validation: skip csr validation if true
    :returns: generated certificate
    """

    ca_key = _load_pem_private_key(ca_key, ca_key_password)

    if not isinstance(issuer_name, six.text_type):
        issuer_name = six.text_type(issuer_name.decode('utf-8'))

    if isinstance(csr, six.text_type):
        csr = six.b(str(csr))
    if not isinstance(csr, x509.CertificateSigningRequest):
        try:
            csr = x509.load_pem_x509_csr(csr, backend=default_backend())
        except ValueError:
            LOG.exception("Received invalid csr %s.", csr)
            raise exception.InvalidCsr(csr=csr)

    term_of_validity = CONF.x509.term_of_validity
    one_day = datetime.timedelta(1, 0, 0)
    expire_after = datetime.timedelta(term_of_validity, 0, 0)

    builder = x509.CertificateBuilder()
    builder = builder.subject_name(csr.subject)
    # issuer_name is set as common name
    builder = builder.issuer_name(x509.Name([
        x509.NameAttribute(x509.OID_COMMON_NAME, issuer_name),
    ]))
    builder = builder.not_valid_before(datetime.datetime.today() - one_day)
    builder = builder.not_valid_after(datetime.datetime.today() + expire_after)
    builder = builder.serial_number(int(uuid.uuid4()))
    builder = builder.public_key(csr.public_key())

    if skip_validation:
        extensions = csr.extensions
    else:
        extensions = validator.filter_extensions(csr.extensions)

    for extention in extensions:
        builder = builder.add_extension(extention.value,
                                        critical=extention.critical)

    certificate = builder.sign(
        private_key=ca_key, algorithm=hashes.SHA256(),
        backend=default_backend()
    ).public_bytes(serialization.Encoding.PEM).strip()

    return certificate