Python源码示例:cryptography.x509.Name()

示例1
def build_csr(self, hostname, **kwargs):
        realm = self.plugin.ipa.env.realm
        builder = x509.CertificateSigningRequestBuilder()
        builder = builder.subject_name(
            x509.Name([
                x509.NameAttribute(oid.NameOID.COMMON_NAME, hostname),
                x509.NameAttribute(oid.NameOID.ORGANIZATION_NAME, realm),
            ])
        )
        build = builder.add_extension(
            x509.BasicConstraints(ca=False, path_length=None), critical=True,
        )
        build = builder.add_extension(
            x509.ExtendedKeyUsage([TLS_SERVERAUTH]), critical=True
        )
        builder = build.add_extension(
            x509.SubjectAlternativeName([x509.DNSName(hostname)]),
            critical=False
        )
        return builder

    # pylint: disable=arguments-differ 
示例2
def extract_dns_subject_alternative_names(certificate: x509.Certificate) -> List[str]:
    """Retrieve all the DNS entries of the Subject Alternative Name extension.
    """
    subj_alt_names: List[str] = []
    try:
        san_ext = certificate.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
        san_ext_value = cast(x509.SubjectAlternativeName, san_ext.value)
        subj_alt_names = san_ext_value.get_values_for_type(DNSName)
    except ExtensionNotFound:
        pass
    except DuplicateExtension:
        # Fix for https://github.com/nabla-c0d3/sslyze/issues/420
        # Not sure how browsers behave in this case but having a duplicate extension makes the certificate invalid
        # so we just return no SANs (likely to make hostname validation fail, which is fine)
        pass

    return subj_alt_names 
示例3
def generate_csr(common_name, dnsnames, ips, keysize):
    key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=keysize,
        backend=default_backend()
    )

    key_pem = key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption(),
    )

    csr = x509.CertificateSigningRequestBuilder()
    csr = csr.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, common_name)]))
    csr = csr.add_extension(
        x509.SubjectAlternativeName(dnsnames + ips),
        critical=False,
    )
    csr = csr.sign(key, hashes.SHA256(), default_backend())

    csr_pem = csr.public_bytes(serialization.Encoding.PEM)

    return key_pem, csr_pem 
示例4
def test_hostnameIsIndicated(self):
        """
        Specifying the C{hostname} argument to L{CertificateOptions} also sets
        the U{Server Name Extension
        <https://en.wikipedia.org/wiki/Server_Name_Indication>} TLS indication
        field to the correct value.
        """
        names = []
        def setupServerContext(ctx):
            def servername_received(conn):
                names.append(conn.get_servername().decode("ascii"))
            ctx.set_tlsext_servername_callback(servername_received)
        cProto, sProto, pump = self.serviceIdentitySetup(
            u"valid.example.com",
            u"valid.example.com",
            setupServerContext
        )
        self.assertEqual(names, [u"valid.example.com"]) 
示例5
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    prev_set_id = -1
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attribute = _decode_x509_name_entry(backend, entry)
        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
        if set_id != prev_set_id:
            attributes.append(set([attribute]))
        else:
            # is in the same RDN a previous entry
            attributes[-1].add(attribute)
        prev_set_id = set_id

    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes) 
示例6
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    prev_set_id = -1
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attribute = _decode_x509_name_entry(backend, entry)
        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
        if set_id != prev_set_id:
            attributes.append(set([attribute]))
        else:
            # is in the same RDN a previous entry
            attributes[-1].add(attribute)
        prev_set_id = set_id

    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes) 
示例7
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    prev_set_id = -1
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attribute = _decode_x509_name_entry(backend, entry)
        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
        if set_id != prev_set_id:
            attributes.append({attribute})
        else:
            # is in the same RDN a previous entry
            attributes[-1].add(attribute)
        prev_set_id = set_id

    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes) 
示例8
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    prev_set_id = -1
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attribute = _decode_x509_name_entry(backend, entry)
        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
        if set_id != prev_set_id:
            attributes.append({attribute})
        else:
            # is in the same RDN a previous entry
            attributes[-1].add(attribute)
        prev_set_id = set_id

    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes) 
示例9
def create_csr(key, domains, must_staple=False):
    """
    Creates a CSR in DER format for the specified key and domain names.
    """
    assert domains
    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
    ])
    san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
    csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
        .add_extension(san, critical=False)
    if must_staple:
        ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request])
        csr = csr.add_extension(ocsp_must_staple, critical=False)
    csr = csr.sign(key, hashes.SHA256(), default_backend())
    return export_csr_for_acme(csr) 
示例10
def test_hostnameIsIndicated(self):
        """
        Specifying the C{hostname} argument to L{CertificateOptions} also sets
        the U{Server Name Extension
        <https://en.wikipedia.org/wiki/Server_Name_Indication>} TLS indication
        field to the correct value.
        """
        names = []
        def setupServerContext(ctx):
            def servername_received(conn):
                names.append(conn.get_servername().decode("ascii"))
            ctx.set_tlsext_servername_callback(servername_received)
        cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup(
            u"valid.example.com",
            u"valid.example.com",
            setupServerContext
        )
        self.assertEqual(names, [u"valid.example.com"]) 
示例11
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    prev_set_id = -1
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attribute = _decode_x509_name_entry(backend, entry)
        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
        if set_id != prev_set_id:
            attributes.append(set([attribute]))
        else:
            # is in the same RDN a previous entry
            attributes[-1].add(attribute)
        prev_set_id = set_id

    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes) 
示例12
def create_self_signed_certificate(subject_name, private_key, days_valid=365):
    subject = x509.Name([
        x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Test, Inc."),
        x509.NameAttribute(x509.NameOID.COMMON_NAME, subject_name)
    ])
    certificate = x509.CertificateBuilder().subject_name(
        subject
    ).issuer_name(
        subject
    ).public_key(
        private_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).add_extension(
        x509.BasicConstraints(ca=True, path_length=None), critical=True
    ).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
        datetime.datetime.utcnow() + datetime.timedelta(days=days_valid)
    ).sign(private_key, hashes.SHA256(), backends.default_backend())

    return certificate 
示例13
def from_crypto_type(cls, certificate: x509.Certificate, certtype: CertificateType):
        # type: (certtype, x509.Certificate, CertificateType) -> Certificate
        m = cls()
        m.serial = certificate.serial_number
        m.pem_data = certificate.public_bytes(serialization.Encoding.PEM)
        m.not_after = certificate.not_valid_after
        m.not_before = certificate.not_valid_before
        m.fingerprint = certificate.fingerprint(hashes.SHA256())
        m.discriminator = certtype.value
        m.serial = str(certificate.serial_number)

        subject: x509.Name = certificate.subject
        cns = subject.get_attributes_for_oid(NameOID.COMMON_NAME)
        if cns is not None:
            m.x509_cn = cns[0].value

        return m 
示例14
def from_crypto(cls, csr: x509.CertificateSigningRequest):
        # type: (type, x509.CertificateSigningRequest, CertificateType) -> Certificate
        m = cls()
        m.pem_data = csr.public_bytes(serialization.Encoding.PEM)
        m.not_before = datetime.datetime.utcnow()
        m.not_after = datetime.datetime.utcnow() + datetime.timedelta(days=700)
        h = hashes.Hash(hashes.SHA256(), default_backend())
        h.update(m.pem_data)
        m.fingerprint = h.finalize()

        m.discriminator = CertificateType.CSR.value

        subject: x509.Name = csr.subject
        cns = subject.get_attributes_for_oid(NameOID.COMMON_NAME)
        if cns is not None:
            m.x509_cn = cns[0].value

        return m 
示例15
def csr(private_key: rsa.RSAPrivateKey) -> x509.CertificateSigningRequest:
    b = x509.CertificateSigningRequestBuilder()
    req = b.subject_name(x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Commandment"),
        x509.NameAttribute(NameOID.COMMON_NAME, u"Commandment"),
    ])).sign(private_key, hashes.SHA256(), default_backend())

    return req 
示例16
def certificate(private_key: rsa.RSAPrivateKey) -> x509.Certificate:
    b = x509.CertificateBuilder()
    name = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Commandment"),
        x509.NameAttribute(NameOID.COMMON_NAME, u"CA-CERTIFICATE"),
    ])

    cer = b.subject_name(name).issuer_name(name).public_key(
        private_key.public_key()
    ).serial_number(1).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
        datetime.datetime.utcnow() + datetime.timedelta(days=10)
    ).add_extension(
        x509.BasicConstraints(ca=False, path_length=None), True
    ).sign(private_key, hashes.SHA256(), default_backend())

    return cer 
示例17
def ca_certificate(private_key: rsa.RSAPrivateKey) -> x509.Certificate:
    b = x509.CertificateBuilder()
    name = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Commandment"),
        x509.NameAttribute(NameOID.COMMON_NAME, u"CA-CERTIFICATE"),
    ])

    cert = b.serial_number(1).issuer_name(
        name
    ).subject_name(
        name
    ).public_key(
        private_key.public_key()
    ).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
        datetime.datetime.utcnow() + datetime.timedelta(days=10)
    ).add_extension(
        x509.BasicConstraints(ca=True, path_length=None), True
    ).sign(private_key, hashes.SHA256(), default_backend())

    return cert 
示例18
def format_name(subject):
    """Convert a subject into the canonical form for distinguished names.

    This function does not take care of sorting the subject in any meaningful order.

    Examples::

        >>> format_name([('CN', 'example.com'), ])
        '/CN=example.com'
        >>> format_name([('CN', 'example.com'), ('O', "My Organization"), ])
        '/CN=example.com/O=My Organization'
    """
    if isinstance(subject, x509.Name):
        subject = [(OID_NAME_MAPPINGS[s.oid], s.value) for s in subject]

    return '/%s' % ('/'.join(['%s=%s' % (force_text(k), force_text(v)) for k, v in subject])) 
示例19
def from_protobuf(cls, obj):
        resources = Resources.from_protobuf(obj.resources)
        files = {k: File.from_protobuf(v) for k, v in obj.files.items()}
        log_level = _proto.Log.Level.Name(obj.log_level)
        log_config = (File.from_protobuf(obj.log_config)
                      if obj.HasField('log_config')
                      else None)
        security = (Security.from_protobuf(obj.security)
                    if obj.HasField('security')
                    else None)
        return cls(resources=resources,
                   files=files,
                   script=obj.script,
                   env=dict(obj.env),
                   log_level=log_level,
                   log_config=log_config,
                   security=security) 
示例20
def from_protobuf(cls, obj):
        state = ApplicationState(_proto.ApplicationState.Type.Name(obj.state))
        final_status = FinalStatus(_proto.FinalStatus.Type.Name(obj.final_status))

        return cls(id=obj.id,
                   name=obj.name,
                   user=obj.user,
                   queue=obj.queue,
                   tags=obj.tags,
                   host=obj.host,
                   port=obj.port,
                   tracking_url=obj.tracking_url,
                   state=state,
                   final_status=final_status,
                   progress=obj.progress,
                   usage=ResourceUsageReport.from_protobuf(obj.usage),
                   diagnostics=obj.diagnostics,
                   start_time=datetime_from_millis(obj.start_time),
                   finish_time=datetime_from_millis(obj.finish_time)) 
示例21
def _get_basic_certificate_text(cls, certificate: Certificate) -> List[str]:
        text_output = [
            cls._format_field(
                "SHA1 Fingerprint:", binascii.hexlify(certificate.fingerprint(hashes.SHA1())).decode("ascii")
            ),
            cls._format_field("Common Name:", _get_name_as_short_text(certificate.subject)),
            cls._format_field("Issuer:", _get_name_as_short_text(certificate.issuer)),
            cls._format_field("Serial Number:", str(certificate.serial_number)),
            cls._format_field("Not Before:", certificate.not_valid_before.date().isoformat()),
            cls._format_field("Not After:", certificate.not_valid_after.date().isoformat()),
            cls._format_field("Public Key Algorithm:", certificate.public_key().__class__.__name__),
        ]

        if certificate.signature_hash_algorithm:
            # The signature_hash_algorithm can be None if signature did not use separate hash (ED25519, ED448)
            # https://cryptography.io/en/latest/x509/reference/#cryptography.x509.Certificate.signature_hash_algorithm
            text_output.append(cls._format_field("Signature Algorithm:", certificate.signature_hash_algorithm.name))

        public_key = certificate.public_key()
        if isinstance(public_key, EllipticCurvePublicKey):
            text_output.append(cls._format_field("Key Size:", str(public_key.curve.key_size)))
            text_output.append(cls._format_field("Curve:", str(public_key.curve.name)))
        elif isinstance(public_key, RSAPublicKey):
            text_output.append(cls._format_field("Key Size:", str(public_key.key_size)))
            text_output.append(cls._format_field("Exponent:", str(public_key.public_numbers().e)))  # type: ignore
        else:
            # DSA Key? https://github.com/nabla-c0d3/sslyze/issues/314
            pass

        try:
            # Print the SAN extension if there's one
            text_output.append(
                cls._format_field(
                    "DNS Subject Alternative Names:", str(extract_dns_subject_alternative_names(certificate))
                )
            )
        except KeyError:
            pass

        return text_output 
示例22
def _get_name_as_short_text(name_field: x509.Name) -> str:
    """Convert a name field returned by the cryptography module to a string suitable for displaying it to the user.
    """
    # Name_field is supposed to be a Subject or an Issuer; print the CN if there is one
    common_names = get_common_names(name_field)
    if common_names:
        # We don't support certs with multiple CNs
        return common_names[0]
    else:
        # Otherwise show the whole field
        return name_field.rfc4514_string() 
示例23
def x509_name_to_json(name: x509.Name) -> Dict[str, Any]:
    attributes = []
    for attr in name:
        attributes.append(
            _X509NameAttributeAsJson(oid=attr.oid, value=attr.value, rfc4514_string=attr.rfc4514_string())
        )

    x509name_as_json = _X509NameAsJson(rfc4514_string=name.rfc4514_string(), attributes=attributes, parsing_error=None)
    return asdict(x509name_as_json) 
示例24
def get_common_names(name_field: x509.Name) -> List[str]:
    return [cn.value for cn in name_field.get_attributes_for_oid(NameOID.COMMON_NAME)] 
示例25
def _generate_csr(cls, cn, private_key, passphrase=None):
        pk = serialization.load_pem_private_key(
            data=private_key, password=passphrase,
            backend=backends.default_backend())
        csr = x509.CertificateSigningRequestBuilder().subject_name(
            x509.Name([
                x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, cn),
            ])
        )
        csr = csr.add_extension(
            x509.BasicConstraints(
                ca=False,
                path_length=None
            ),
            critical=True
        )
        csr = csr.add_extension(
            x509.KeyUsage(
                digital_signature=True,
                key_encipherment=True,
                data_encipherment=True,
                key_agreement=True,
                content_commitment=False,
                key_cert_sign=False,
                crl_sign=False,
                encipher_only=False,
                decipher_only=False
            ),
            critical=True
        )
        csr = csr.add_extension(
            x509.SubjectAlternativeName([x509.DNSName(cn)]),
            critical=False
        )
        signed_csr = csr.sign(
            pk,
            getattr(hashes, CONF.certificates.signing_digest.upper())(),
            backends.default_backend())
        return signed_csr.public_bytes(serialization.Encoding.PEM) 
示例26
def setUp(self):
        self.signing_digest = "sha256"

        # Set up CSR data
        csr_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=backends.default_backend()
        )
        csr = x509.CertificateSigningRequestBuilder().subject_name(
            x509.Name([
                x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"test"),
            ])).sign(csr_key, hashes.SHA256(), backends.default_backend())
        self.certificate_signing_request = csr.public_bytes(
            serialization.Encoding.PEM)

        # Set up keys
        self.ca_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=backends.default_backend()
        )

        self.ca_private_key_passphrase = b"Testing"
        self.ca_private_key = self.ca_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.BestAvailableEncryption(
                self.ca_private_key_passphrase),
        )

        super(BaseLocalCSRTestCase, self).setUp() 
示例27
def setUp(self):
        super(TestLocalGenerator, self).setUp()
        self.signing_digest = "sha256"

        # Setup CA data

        ca_cert = x509.CertificateBuilder()
        valid_from_datetime = datetime.datetime.utcnow()
        valid_until_datetime = (datetime.datetime.utcnow() +
                                datetime.timedelta(
            seconds=2 * 365 * 24 * 60 * 60))
        ca_cert = ca_cert.not_valid_before(valid_from_datetime)
        ca_cert = ca_cert.not_valid_after(valid_until_datetime)
        ca_cert = ca_cert.serial_number(1)
        subject_name = x509.Name([
            x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, u"US"),
            x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME,
                               u"Oregon"),
            x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, u"Springfield"),
            x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME,
                               u"Springfield Nuclear Power Plant"),
            x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"maggie1"),
        ])
        ca_cert = ca_cert.subject_name(subject_name)
        ca_cert = ca_cert.issuer_name(subject_name)
        ca_cert = ca_cert.public_key(self.ca_key.public_key())
        signed_cert = ca_cert.sign(private_key=self.ca_key,
                                   algorithm=hashes.SHA256(),
                                   backend=backends.default_backend())

        self.ca_certificate = signed_cert.public_bytes(
            encoding=serialization.Encoding.PEM)

        self.cert_generator = local_cert_gen.LocalCertGenerator 
示例28
def serialize(self,
                  # password=None,
                  country=u"US",
                  state=u"CA",
                  city=u"San Francisco",
                  company=u"Lokey Examle",
                  common_name=u"example.com"):
        # This should be handled already
        # if not password:
        #     password = None
        key = serialization.load_pem_private_key(
            self.to('pem'),
            password=None,
            backend=default_backend())

        subject = x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, country),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state),
            x509.NameAttribute(NameOID.LOCALITY_NAME, city),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, company),
            x509.NameAttribute(NameOID.COMMON_NAME, common_name),
        ])
        cert = x509.CertificateSigningRequestBuilder().subject_name(
            subject
        ).sign(key, hashes.SHA256(), default_backend())
        return cert.public_bytes(serialization.Encoding.PEM) 
示例29
def certificate_template(
    subject: x509.name.Name,
    issuer: x509.name.Name,
    public_key: x509.name.Name,
    certauthority: bool = False,
) -> x509.base.CertificateBuilder:

    if certauthority:
        not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10)

    else:  # shorter valid length for on-the-fly certificates
        not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=7)

    return (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(public_key)
        .serial_number(x509.random_serial_number())
        .not_valid_before(datetime.datetime.utcnow())
        .not_valid_after(not_valid_after)
        .add_extension(
            x509.SubjectAlternativeName([x509.DNSName("localhost")]), critical=True
        )
        .add_extension(
            x509.BasicConstraints(ca=certauthority, path_length=None), critical=True
        )
    ) 
示例30
def _decode_x509_name(backend, x509_name):
    count = backend._lib.X509_NAME_entry_count(x509_name)
    attributes = []
    for x in range(count):
        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
        attributes.append(_decode_x509_name_entry(backend, entry))

    return x509.Name(attributes)