Python源码示例:cryptography.x509.DNSName()

示例1
def build_csr(self, hostname, **kwargs):
        realm = self.plugin.ipa.env.realm
        builder = x509.CertificateSigningRequestBuilder()
        builder = builder.subject_name(
            x509.Name([
                x509.NameAttribute(oid.NameOID.COMMON_NAME, hostname),
                x509.NameAttribute(oid.NameOID.ORGANIZATION_NAME, realm),
            ])
        )
        build = builder.add_extension(
            x509.BasicConstraints(ca=False, path_length=None), critical=True,
        )
        build = builder.add_extension(
            x509.ExtendedKeyUsage([TLS_SERVERAUTH]), critical=True
        )
        builder = build.add_extension(
            x509.SubjectAlternativeName([x509.DNSName(hostname)]),
            critical=False
        )
        return builder

    # pylint: disable=arguments-differ 
示例2
def extract_dns_subject_alternative_names(certificate: x509.Certificate) -> List[str]:
    """Retrieve all the DNS entries of the Subject Alternative Name extension.
    """
    subj_alt_names: List[str] = []
    try:
        san_ext = certificate.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
        san_ext_value = cast(x509.SubjectAlternativeName, san_ext.value)
        subj_alt_names = san_ext_value.get_values_for_type(DNSName)
    except ExtensionNotFound:
        pass
    except DuplicateExtension:
        # Fix for https://github.com/nabla-c0d3/sslyze/issues/420
        # Not sure how browsers behave in this case but having a duplicate extension makes the certificate invalid
        # so we just return no SANs (likely to make hostname validation fail, which is fine)
        pass

    return subj_alt_names 
示例3
def create_csr(key, domains, must_staple=False):
    """
    Creates a CSR in DER format for the specified key and domain names.
    """
    assert domains
    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
    ])
    san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
    csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
        .add_extension(san, critical=False)
    if must_staple:
        ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request])
        csr = csr.add_extension(ocsp_must_staple, critical=False)
    csr = csr.sign(key, hashes.SHA256(), default_backend())
    return export_csr_for_acme(csr) 
示例4
def match(self, value):
        # This is somewhat terrible. Probably can be better after
        # pyca/service_identity#14 is resolved.
        target_ids = [
            DNSPattern(target_name.encode('utf-8'))
            for target_name
            in (
                value.extensions
                .get_extension_for_oid(
                    ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
                .value
                .get_values_for_type(x509.DNSName)
            )]
        ids = [DNS_ID(self.name)]
        try:
            verify_service_identity(
                cert_patterns=target_ids, obligatory_ids=ids, optional_ids=[])
        except VerificationError:
            return Mismatch(
                '{!r} is not valid for {!r}'.format(value, self.name)) 
示例5
def test_certificate_input_with_extensions(client, authority):
    from lemur.certificates.schemas import CertificateInputSchema

    input_data = {
        "commonName": "test.example.com",
        "owner": "jim@example.com",
        "authority": {"id": authority.id},
        "description": "testtestest",
        "extensions": {
            "keyUsage": {"digital_signature": True},
            "extendedKeyUsage": {
                "useClientAuthentication": True,
                "useServerAuthentication": True,
            },
            "subjectKeyIdentifier": {"includeSKI": True},
            "subAltNames": {
                "names": [{"nameType": "DNSName", "value": "test.example.com"}]
            },
        },
        "dnsProvider": None,
    }

    data, errors = CertificateInputSchema().load(input_data)
    assert not errors 
示例6
def test_certificate_allowed_names(client, authority, session, logged_in_user):
    """Test for allowed CN and SAN values."""
    from lemur.certificates.schemas import CertificateInputSchema

    input_data = {
        "commonName": "Names with spaces are not checked",
        "owner": "jim@example.com",
        "authority": {"id": authority.id},
        "description": "testtestest",
        "validityStart": "2020-01-01T00:00:00",
        "validityEnd": "2020-01-01T00:00:01",
        "extensions": {
            "subAltNames": {
                "names": [
                    {"nameType": "DNSName", "value": "allowed.example.com"},
                    {"nameType": "IPAddress", "value": "127.0.0.1"},
                ]
            }
        },
        "dnsProvider": None,
    }

    data, errors = CertificateInputSchema().load(input_data)
    assert not errors 
示例7
def test_csr_disallowed_san(client, logged_in_user):
    """SAN name is disallowed by LEMUR_WHITELISTED_DOMAINS."""
    from lemur.common import validators

    request, pkey = create_csr(
        common_name="CN with spaces isn't a domain and is thus allowed",
        owner="joe@example.com",
        key_type="RSA2048",
        extensions={
            "sub_alt_names": {
                "names": x509.SubjectAlternativeName([x509.DNSName("evilhacker.org")])
            }
        },
    )
    with pytest.raises(ValidationError) as err:
        validators.csr(request)
    assert str(err.value).startswith(
        "Domain evilhacker.org does not match whitelisted domain patterns"
    ) 
示例8
def domains(cert):
    """
    Attempts to get an domains listed in a certificate.
    If 'subjectAltName' extension is not available we simply
    return the common name.

    :param cert:
    :return: List of domains
    """
    domains = []
    try:
        ext = cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_ALTERNATIVE_NAME)
        entries = ext.value.get_values_for_type(x509.DNSName)
        for entry in entries:
            domains.append(entry)
    except x509.ExtensionNotFound:
        if current_app.config.get("LOG_SSL_SUBJ_ALT_NAME_ERRORS", True):
            sentry.captureException()
    except Exception as e:
        sentry.captureException()

    return domains 
示例9
def sub_alt_type(alt_type):
    """
    Determines if the specified subject alternate type is valid.
    :param alt_type:
    :return:
    """
    valid_types = [
        "DNSName",
        "IPAddress",
        "uniFormResourceIdentifier",
        "directoryName",
        "rfc822Name",
        "registrationID",
        "otherName",
        "x400Address",
        "EDIPartyName",
    ]
    if alt_type.lower() not in [a_type.lower() for a_type in valid_types]:
        raise ValidationError(
            "Invalid SubAltName Type: {0} choose from {1}".format(
                type, ",".join(valid_types)
            )
        ) 
示例10
def test_map_fields_with_validity_years(mock_current_app):
    mock_current_app.config.get = Mock(side_effect=config_mock)

    with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
        mock_signature_hash.return_value = "sha256"

        names = [u"one.example.com", u"two.example.com", u"three.example.com"]
        options = {
            "common_name": "example.com",
            "owner": "bob@example.com",
            "description": "test certificate",
            "extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
            "validity_years": 2
        }
        expected = {
            "certificate": {
                "csr": CSR_STR,
                "common_name": "example.com",
                "dns_names": names,
                "signature_hash": "sha256",
            },
            "organization": {"id": 111111},
            "validity_years": 2,
        }
        assert expected == plugin.map_fields(options, CSR_STR) 
示例11
def _dnsname_to_stdlib(name):
    """Converts a DNSName SubjectAlternativeName field to the form used by the standard library.

    Cryptography produces a dNSName as a unicode string that was idna-decoded
    from ASCII bytes. We need to idna-encode that string to get it back, and
    then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
    uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).

    Notes:
        This depends on the Python version's standard library.
    """

    def idna_encode(name):
        """Borrowed wholesale from the Python Cryptography Project.

        It turns out that we can't just safely call `idna.encode`: it can explode for
        wildcard names. This avoids that problem.
        """
        import idna

        for prefix in ['*.', '.']:
            if name.startswith(prefix):
                name = name[len(prefix):]
                return prefix.encode('ascii') + idna.encode(name)
        return idna.encode(name)

    name = idna_encode(name)
    if sys.version_info >= (3, 0):
        name = name.decode('utf-8')
    return name 
示例12
def read_cert(cert_dir):
    with open(os.path.join(cert_dir, 'cert.pem'), 'rb') as cert_file:
        pem_data = cert_file.read()
    cert = x509.load_pem_x509_certificate(pem_data, default_backend())
    common_name = [na.value for na in cert.subject if na.oid._dotted_string == "2.5.4.3"][0]
    subject_alternative_names = [ext.value for ext in cert.extensions if ext.oid._dotted_string == "2.5.29.17"][0]
    dns_names = subject_alternative_names.get_values_for_type(x509.DNSName)
    ip_sans = [str(ip) for ip in subject_alternative_names.get_values_for_type(x509.IPAddress)]

    return common_name, dns_names, ip_sans 
示例13
def service_dns(service_name, namespace, domain):
    return [
        x509.DNSName(f'{service_name}.{namespace}.svc.{domain}'),
        x509.DNSName(f'{service_name}.{namespace}.svc'),
        x509.DNSName(f'{service_name}.{namespace}'),
        x509.DNSName(f'{service_name}'),
    ] 
示例14
def pod_dns(pod_ip, namespace, domain):
    return [
        x509.DNSName(f'{pod_ip.replace(".", "-")}.{namespace}.pod.{domain}'),
        x509.DNSName(f'{pod_ip.replace(".", "-")}.{namespace}.pod'),
    ] 
示例15
def headless_dns(hostname, subdomain, namespace, domain):
    return [
        x509.DNSName(f'{hostname}.{subdomain}.{namespace}.svc.{domain}'),
        x509.DNSName(f'{hostname}.{subdomain}.{namespace}.svc'),
        x509.DNSName(f'{hostname}.{subdomain}.{namespace}'),
        x509.DNSName(f'{hostname}.{subdomain}'),
        x509.DNSName(f'{hostname}'),
    ] 
示例16
def _generate_csr(cls, cn, private_key, passphrase=None):
        pk = serialization.load_pem_private_key(
            data=private_key, password=passphrase,
            backend=backends.default_backend())
        csr = x509.CertificateSigningRequestBuilder().subject_name(
            x509.Name([
                x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, cn),
            ])
        )
        csr = csr.add_extension(
            x509.BasicConstraints(
                ca=False,
                path_length=None
            ),
            critical=True
        )
        csr = csr.add_extension(
            x509.KeyUsage(
                digital_signature=True,
                key_encipherment=True,
                data_encipherment=True,
                key_agreement=True,
                content_commitment=False,
                key_cert_sign=False,
                crl_sign=False,
                encipher_only=False,
                decipher_only=False
            ),
            critical=True
        )
        csr = csr.add_extension(
            x509.SubjectAlternativeName([x509.DNSName(cn)]),
            critical=False
        )
        signed_csr = csr.sign(
            pk,
            getattr(hashes, CONF.certificates.signing_digest.upper())(),
            backends.default_backend())
        return signed_csr.public_bytes(serialization.Encoding.PEM) 
示例17
def get_host_names(certificate):
    """Extract the host names from the Pem encoded X509 certificate

    :param certificate: A PEM encoded certificate
    :returns: A dictionary containing the following keys:
              ['cn', 'dns_names']
              where 'cn' is the CN from the SubjectName of the
              certificate, and 'dns_names' is a list of dNSNames
              (possibly empty) from the SubjectAltNames of the certificate.
    """
    if isinstance(certificate, str):
        certificate = certificate.encode('utf-8')
    try:
        cert = x509.load_pem_x509_certificate(certificate,
                                              backends.default_backend())
        cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0]
        host_names = {
            'cn': cn.value.lower(),
            'dns_names': []
        }
        try:
            ext = cert.extensions.get_extension_for_oid(
                x509.OID_SUBJECT_ALTERNATIVE_NAME
            )
            host_names['dns_names'] = ext.value.get_values_for_type(
                x509.DNSName)
        except x509.ExtensionNotFound:
            LOG.debug("%s extension not found",
                      x509.OID_SUBJECT_ALTERNATIVE_NAME)

        return host_names
    except Exception:
        LOG.exception('Unreadable Certificate.')
        raise exceptions.UnreadableCert 
示例18
def certificate_template(
    subject: x509.name.Name,
    issuer: x509.name.Name,
    public_key: x509.name.Name,
    certauthority: bool = False,
) -> x509.base.CertificateBuilder:

    if certauthority:
        not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10)

    else:  # shorter valid length for on-the-fly certificates
        not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=7)

    return (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(public_key)
        .serial_number(x509.random_serial_number())
        .not_valid_before(datetime.datetime.utcnow())
        .not_valid_after(not_valid_after)
        .add_extension(
            x509.SubjectAlternativeName([x509.DNSName("localhost")]), critical=True
        )
        .add_extension(
            x509.BasicConstraints(ca=certauthority, path_length=None), critical=True
        )
    ) 
示例19
def subject_alt_name(self):
        return x509.SubjectAlternativeName(
            [x509.DNSName(text_type(addr)) for addr in self.ip_addr_list]
        ) 
示例20
def get_certificate_domains(cert):
    """
    Gets a list of all Subject Alternative Names in the specified certificate.
    """
    for ext in cert.extensions:
        ext = ext.value
        if isinstance(ext, x509.SubjectAlternativeName):
            return ext.get_values_for_type(x509.DNSName)
    return [] 
示例21
def generate_tls_sni_01_cert(server_name, key_type=u'rsa',
                             _generate_private_key=None):
    """
    Generate a certificate/key pair for responding to a tls-sni-01 challenge.

    :param str server_name: The SAN the certificate should have.
    :param str key_type: The type of key to generate; usually not necessary.

    :rtype: ``Tuple[`~cryptography.x509.Certificate`, PrivateKey]``
    :return: A tuple of the certificate and private key.
    """
    key = (_generate_private_key or generate_private_key)(key_type)
    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u'acme.invalid')])
    cert = (
        x509.CertificateBuilder()
        .subject_name(name)
        .issuer_name(name)
        .not_valid_before(datetime.now() - timedelta(seconds=3600))
        .not_valid_after(datetime.now() + timedelta(seconds=3600))
        .serial_number(int(uuid.uuid4()))
        .public_key(key.public_key())
        .add_extension(
            x509.SubjectAlternativeName([x509.DNSName(server_name)]),
            critical=False)
        .sign(
            private_key=key,
            algorithm=hashes.SHA256(),
            backend=default_backend())
        )
    return (cert, key) 
示例22
def csr_for_names(names, key):
    """
    Generate a certificate signing request for the given names and private key.

    ..  seealso:: `acme.client.Client.request_issuance`

    ..  seealso:: `generate_private_key`

    :param ``List[str]``: One or more names (subjectAltName) for which to
        request a certificate.
    :param key: A Cryptography private key object.

    :rtype: `cryptography.x509.CertificateSigningRequest`
    :return: The certificate request message.
    """
    if len(names) == 0:
        raise ValueError('Must have at least one name')
    if len(names[0]) > 64:
        common_name = u'san.too.long.invalid'
    else:
        common_name = names[0]
    return (
        x509.CertificateSigningRequestBuilder()
        .subject_name(x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, common_name)]))
        .add_extension(
            x509.SubjectAlternativeName(list(map(x509.DNSName, names))),
            critical=False)
        .sign(key, hashes.SHA256(), default_backend())) 
示例23
def test_get_certificate_primitives(certificate):
    from lemur.certificates.service import get_certificate_primitives

    names = [x509.DNSName(x.name) for x in certificate.domains]

    with freeze_time(datetime.date(year=2016, month=10, day=30)):
        primitives = get_certificate_primitives(certificate)
        assert len(primitives) == 26 
示例24
def test_certificate_input_schema_parse_csr(authority):
    from lemur.certificates.schemas import CertificateInputSchema

    test_san_dns = "foobar.com"
    extensions = {
        "sub_alt_names": {
            "names": x509.SubjectAlternativeName([x509.DNSName(test_san_dns)])
        }
    }
    csr, private_key = create_csr(
        owner="joe@example.com",
        common_name="ACommonName",
        organization="test",
        organizational_unit="Meters",
        country="NL",
        state="Noord-Holland",
        location="Amsterdam",
        key_type="RSA2048",
        extensions=extensions,
    )

    input_data = {
        "commonName": "test.example.com",
        "owner": "jim@example.com",
        "authority": {"id": authority.id},
        "description": "testtestest",
        "csr": csr,
        "dnsProvider": None,
    }

    data, errors = CertificateInputSchema().load(input_data)

    for san in data["extensions"]["sub_alt_names"]["names"]:
        assert san.value == test_san_dns
    assert not errors 
示例25
def test_certificate_disallowed_names(client, authority, session, logged_in_user):
    """The CN and SAN are disallowed by LEMUR_WHITELISTED_DOMAINS."""
    from lemur.certificates.schemas import CertificateInputSchema

    input_data = {
        "commonName": "*.example.com",
        "owner": "jim@example.com",
        "authority": {"id": authority.id},
        "description": "testtestest",
        "validityStart": "2020-01-01T00:00:00",
        "validityEnd": "2020-01-01T00:00:01",
        "extensions": {
            "subAltNames": {
                "names": [
                    {"nameType": "DNSName", "value": "allowed.example.com"},
                    {"nameType": "DNSName", "value": "evilhacker.org"},
                ]
            }
        },
        "dnsProvider": None,
    }

    data, errors = CertificateInputSchema().load(input_data)
    assert errors["common_name"][0].startswith(
        "Domain *.example.com does not match whitelisted domain patterns"
    )
    assert errors["extensions"]["sub_alt_names"]["names"][0].startswith(
        "Domain evilhacker.org does not match whitelisted domain patterns"
    ) 
示例26
def test_create_csr():
    csr, private_key = create_csr(
        owner="joe@example.com",
        common_name="ACommonName",
        organization="test",
        organizational_unit="Meters",
        country="US",
        state="CA",
        location="Here",
        key_type="RSA2048",
    )
    assert csr
    assert private_key

    extensions = {
        "sub_alt_names": {
            "names": x509.SubjectAlternativeName([x509.DNSName("AnotherCommonName")])
        }
    }
    csr, private_key = create_csr(
        owner="joe@example.com",
        common_name="ACommonName",
        organization="test",
        organizational_unit="Meters",
        country="US",
        state="CA",
        location="Here",
        extensions=extensions,
        key_type="RSA2048",
    )
    assert csr
    assert private_key 
示例27
def _serialize(self, value, attr, obj):
        general_names = []
        name_type = None

        if value:
            for name in value._general_names:
                value = name.value

                if isinstance(name, x509.DNSName):
                    name_type = "DNSName"

                elif isinstance(name, x509.IPAddress):
                    if isinstance(value, ipaddress.IPv4Network):
                        name_type = "IPNetwork"
                    else:
                        name_type = "IPAddress"

                    value = str(value)

                elif isinstance(name, x509.UniformResourceIdentifier):
                    name_type = "uniformResourceIdentifier"

                elif isinstance(name, x509.DirectoryName):
                    name_type = "directoryName"

                elif isinstance(name, x509.RFC822Name):
                    name_type = "rfc822Name"

                elif isinstance(name, x509.RegisteredID):
                    name_type = "registeredID"
                    value = value.dotted_string
                else:
                    current_app.logger.warning(
                        "Unknown SubAltName type: {name}".format(name=name)
                    )
                    continue

                general_names.append({"nameType": name_type, "value": value})

        return general_names 
示例28
def get_additional_names(options):
    """
    Return a list of strings to be added to a SAN certificates.

    :param options:
    :return:
    """
    names = []
    # add SANs if present
    if options.get("extensions"):
        for san in options["extensions"]["sub_alt_names"]:
            if isinstance(san, x509.DNSName):
                names.append(san.value)
    return names 
示例29
def test_get_domains_multiple(self, mock_current_app):
        options = {
            "common_name": "test.netflix.net",
            "extensions": {
                "sub_alt_names": {"names": [DNSName("test2.netflix.net"), DNSName("test3.netflix.net")]}
            },
        }
        result = self.acme.get_domains(options)
        self.assertEqual(
            result, [options["common_name"], "test2.netflix.net", "test3.netflix.net"]
        ) 
示例30
def test_get_domains_san(self, mock_current_app):
        options = {
            "common_name": "test.netflix.net",
            "extensions": {
                "sub_alt_names": {"names": [DNSName("test.netflix.net"), DNSName("test2.netflix.net")]}
            },
        }
        result = self.acme.get_domains(options)
        self.assertEqual(
            result, [options["common_name"], "test2.netflix.net"]
        )