Python源码示例:cryptography.x509.CertificateBuilder()

示例1
def create_cert_builder(subject, issuer_name, public_key, days=365, is_ca=False):
    """
    The method to create a builder for all types of certificates.
    :param subject: The subject of the certificate.
    :param issuer_name: The name of the issuer.
    :param public_key: The public key of the certificate.
    :param days: The number of days for which the certificate is valid. The default is 1 year or 365 days.
    :param is_ca: Boolean to indicate if a cert is ca or non ca.
    :return: The certificate builder.
    :rtype: :class `x509.CertificateBuilder`
    """
    builder = x509.CertificateBuilder()

    builder = builder.subject_name(subject)
    builder = builder.issuer_name(issuer_name)
    builder = builder.public_key(public_key)
    builder = builder.not_valid_before(datetime.today())

    builder = builder.not_valid_after(datetime.today() + timedelta(days=days))
    builder = builder.serial_number(int(uuid.uuid4()))
    builder = builder.add_extension(
        x509.BasicConstraints(ca=is_ca, path_length=None), critical=True
    )
    return builder 
示例2
def create_self_signed_certificate(subject_name, private_key, days_valid=365):
    subject = x509.Name([
        x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Test, Inc."),
        x509.NameAttribute(x509.NameOID.COMMON_NAME, subject_name)
    ])
    certificate = x509.CertificateBuilder().subject_name(
        subject
    ).issuer_name(
        subject
    ).public_key(
        private_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).add_extension(
        x509.BasicConstraints(ca=True, path_length=None), critical=True
    ).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
        datetime.datetime.utcnow() + datetime.timedelta(days=days_valid)
    ).sign(private_key, hashes.SHA256(), backends.default_backend())

    return certificate 
示例3
def certificate(private_key: rsa.RSAPrivateKey) -> x509.Certificate:
    b = x509.CertificateBuilder()
    name = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Commandment"),
        x509.NameAttribute(NameOID.COMMON_NAME, u"CA-CERTIFICATE"),
    ])

    cer = b.subject_name(name).issuer_name(name).public_key(
        private_key.public_key()
    ).serial_number(1).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
        datetime.datetime.utcnow() + datetime.timedelta(days=10)
    ).add_extension(
        x509.BasicConstraints(ca=False, path_length=None), True
    ).sign(private_key, hashes.SHA256(), default_backend())

    return cer 
示例4
def ca_certificate(private_key: rsa.RSAPrivateKey) -> x509.Certificate:
    b = x509.CertificateBuilder()
    name = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Commandment"),
        x509.NameAttribute(NameOID.COMMON_NAME, u"CA-CERTIFICATE"),
    ])

    cert = b.serial_number(1).issuer_name(
        name
    ).subject_name(
        name
    ).public_key(
        private_key.public_key()
    ).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
        datetime.datetime.utcnow() + datetime.timedelta(days=10)
    ).add_extension(
        x509.BasicConstraints(ca=True, path_length=None), True
    ).sign(private_key, hashes.SHA256(), default_backend())

    return cert 
示例5
def sign_cert_builder(cert_builder, private_key, alg=None):
    """
    Create certificate from CertificateBuilder and sign with provided key and
    algorithm.

    Args:
        cert_builder (x509.CertificateBuilder): Certificate configuration that
            should be signed.

    Return:
        x509.Certificate
    """
    alg = alg if alg else hashes.SHA256()
    return cert_builder.sign(
        private_key=private_key,
        algorithm=alg,
        backend=cryptography_default_backend
        ) 
示例6
def sign_cert_builder(cert_builder, private_key, alg=None):
    """
    Create certificate from CertificateBuilder and sign with provided key and
    algorithm.

    Args:
        cert_builder (x509.CertificateBuilder): Certificate configuration that
            should be signed.

    Return:
        x509.Certificate
    """
    alg = alg if alg else hashes.SHA256()
    return cert_builder.sign(
        private_key=private_key,
        algorithm=alg,
        backend=cryptography_default_backend
        ) 
示例7
def setUp(self):
        super(TestLocalGenerator, self).setUp()
        self.signing_digest = "sha256"

        # Setup CA data

        ca_cert = x509.CertificateBuilder()
        valid_from_datetime = datetime.datetime.utcnow()
        valid_until_datetime = (datetime.datetime.utcnow() +
                                datetime.timedelta(
            seconds=2 * 365 * 24 * 60 * 60))
        ca_cert = ca_cert.not_valid_before(valid_from_datetime)
        ca_cert = ca_cert.not_valid_after(valid_until_datetime)
        ca_cert = ca_cert.serial_number(1)
        subject_name = x509.Name([
            x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, u"US"),
            x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME,
                               u"Oregon"),
            x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, u"Springfield"),
            x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME,
                               u"Springfield Nuclear Power Plant"),
            x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"maggie1"),
        ])
        ca_cert = ca_cert.subject_name(subject_name)
        ca_cert = ca_cert.issuer_name(subject_name)
        ca_cert = ca_cert.public_key(self.ca_key.public_key())
        signed_cert = ca_cert.sign(private_key=self.ca_key,
                                   algorithm=hashes.SHA256(),
                                   backend=backends.default_backend())

        self.ca_certificate = signed_cert.public_bytes(
            encoding=serialization.Encoding.PEM)

        self.cert_generator = local_cert_gen.LocalCertGenerator 
示例8
def certificate_template(
    subject: x509.name.Name,
    issuer: x509.name.Name,
    public_key: x509.name.Name,
    certauthority: bool = False,
) -> x509.base.CertificateBuilder:

    if certauthority:
        not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10)

    else:  # shorter valid length for on-the-fly certificates
        not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=7)

    return (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(public_key)
        .serial_number(x509.random_serial_number())
        .not_valid_before(datetime.datetime.utcnow())
        .not_valid_after(not_valid_after)
        .add_extension(
            x509.SubjectAlternativeName([x509.DNSName("localhost")]), critical=True
        )
        .add_extension(
            x509.BasicConstraints(ca=certauthority, path_length=None), critical=True
        )
    ) 
示例9
def test_pfx(_autorestart, _autocmd, _fix_permissions, fake_env, fake_config):
    archive_path = fake_env["archive"]
    key = rsa.generate_private_key(
        public_exponent=65537, key_size=2048, backend=default_backend()
    )
    with open(archive_path / "privkey.pem", "wb") as f:
        f.write(
            key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.TraditionalOpenSSL,
                encryption_algorithm=serialization.NoEncryption(),
            )
        )

    subject = issuer = x509.Name(
        [x509.NameAttribute(NameOID.COMMON_NAME, u"example.com")]
    )
    cert = (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(key.public_key())
        .serial_number(x509.random_serial_number())
        .not_valid_before(datetime.datetime.utcnow())
        .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=10))
        .sign(key, hashes.SHA256(), default_backend())
    )

    with open(archive_path / "cert.pem", "wb") as f:
        f.write(cert.public_bytes(serialization.Encoding.PEM))
    with open(archive_path / "chain.pem", "wb") as f:
        f.write(cert.public_bytes(serialization.Encoding.PEM))

    hooks.deploy(config.load(fake_config), LINEAGE)

    assert os.path.exists(archive_path / "cert.pfx")
    assert os.stat(archive_path / "cert.pfx").st_size != 0 
示例10
def issue_certificate(self, ca_pem_key, ca_pem_cert, csr_pem):

        ca_cert = self.load_pem(entity_type="certificate", pem_text=ca_pem_cert)
        ca_key = self.load_pem(entity_type="key", pem_text=ca_pem_key)
        csr = self.load_pem(entity_type="csr", pem_text=csr_pem)

        builder = x509.CertificateBuilder(
            issuer_name=ca_cert.issuer,
            subject_name=csr.subject,
            public_key=csr.public_key(),
            not_valid_before=self.not_valid_before_date,
            not_valid_after=self.not_valid_after_date,
            extensions=csr.extensions,
            serial_number=self.serial_num,
        )

        certificate = builder.sign(
            private_key=ca_key,
            algorithm=hashes.SHA256(),
            backend=default_backend(),
        )

        self.pem_certificate = certificate.public_bytes(
            encoding=serialization.Encoding.PEM
        )

        return certificate 
示例11
def generate_tls_sni_01_cert(server_name, key_type=u'rsa',
                             _generate_private_key=None):
    """
    Generate a certificate/key pair for responding to a tls-sni-01 challenge.

    :param str server_name: The SAN the certificate should have.
    :param str key_type: The type of key to generate; usually not necessary.

    :rtype: ``Tuple[`~cryptography.x509.Certificate`, PrivateKey]``
    :return: A tuple of the certificate and private key.
    """
    key = (_generate_private_key or generate_private_key)(key_type)
    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u'acme.invalid')])
    cert = (
        x509.CertificateBuilder()
        .subject_name(name)
        .issuer_name(name)
        .not_valid_before(datetime.now() - timedelta(seconds=3600))
        .not_valid_after(datetime.now() + timedelta(seconds=3600))
        .serial_number(int(uuid.uuid4()))
        .public_key(key.public_key())
        .add_extension(
            x509.SubjectAlternativeName([x509.DNSName(server_name)]),
            critical=False)
        .sign(
            private_key=key,
            algorithm=hashes.SHA256(),
            backend=default_backend())
        )
    return (cert, key) 
示例12
def _generate_ca_cert(self):
        """
        Generate a CA cert/key.
        """
        if self._ca_key is None:
            self._ca_key = generate_private_key(u'rsa')
        self._ca_name = x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, u'ACME Snake Oil CA')])
        self._ca_cert = (
            x509.CertificateBuilder()
            .subject_name(self._ca_name)
            .issuer_name(self._ca_name)
            .not_valid_before(self._now() - timedelta(seconds=3600))
            .not_valid_after(self._now() + timedelta(days=3650))
            .public_key(self._ca_key.public_key())
            .serial_number(int(uuid4()))
            .add_extension(
                x509.BasicConstraints(ca=True, path_length=0),
                critical=True)
            .add_extension(
                x509.SubjectKeyIdentifier.from_public_key(
                    self._ca_key.public_key()),
                critical=False)
            .sign(
                private_key=self._ca_key,
                algorithm=hashes.SHA256(),
                backend=default_backend()))
        self._ca_aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(
            self._ca_key.public_key()) 
示例13
def request_issuance(self, csr):
        csr = csr.csr
        # TODO: Only in Cryptography 1.3
        # assert csr.is_signature_valid
        cert = (
            x509.CertificateBuilder()
            .subject_name(csr.subject)
            .issuer_name(self._ca_name)
            .not_valid_before(self._now() - timedelta(seconds=3600))
            .not_valid_after(self._now() + timedelta(days=90))
            .serial_number(int(uuid4()))
            .public_key(csr.public_key())
            .add_extension(
                csr.extensions.get_extension_for_oid(
                    ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value,
                critical=False)
            .add_extension(
                x509.SubjectKeyIdentifier.from_public_key(csr.public_key()),
                critical=False)
            .add_extension(self._ca_aki, critical=False)
            .sign(
                private_key=self._ca_key,
                algorithm=hashes.SHA256(),
                backend=default_backend()))
        cert_res = messages.CertificateResource(
            body=cert.public_bytes(encoding=serialization.Encoding.DER))
        return self._controller.issue().addCallback(lambda _: cert_res) 
示例14
def cert_builder(private_key):
    return (
        x509.CertificateBuilder()
        .subject_name(
            x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, "foo.com")])
        )
        .issuer_name(
            x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, "foo.com")])
        )
        .serial_number(1)
        .public_key(private_key.public_key())
        .not_valid_before(datetime.datetime(2017, 12, 22))
        .not_valid_after(datetime.datetime(2040, 1, 1))
    ) 
示例15
def create_certificate(subject_name,
                       private_key,
                       signing_certificate,
                       signing_key,
                       days_valid=365,
                       client_auth=False):
    subject = x509.Name([
        x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Test, Inc."),
        x509.NameAttribute(x509.NameOID.COMMON_NAME, subject_name)
    ])
    builder = x509.CertificateBuilder().subject_name(
        subject
    ).issuer_name(
        signing_certificate.subject
    ).public_key(
        private_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
        datetime.datetime.utcnow() + datetime.timedelta(days=days_valid)
    )

    if client_auth:
        builder = builder.add_extension(
            x509.ExtendedKeyUsage([x509.ExtendedKeyUsageOID.CLIENT_AUTH]),
            critical=True
        )

    certificate = builder.sign(
        signing_key,
        hashes.SHA256(),
        backends.default_backend()
    )
    return certificate 
示例16
def generate_selfsigned_cert(hostname="/CN=Chia Blockchain CA", key=None):
    """Generates self signed certificate for a hostname, and optional IP addresses."""

    # Generate our key
    if key is None:
        key = rsa.generate_private_key(
            public_exponent=65537, key_size=2048, backend=default_backend(),
        )

    name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])

    # path_len=0 means this cert can only sign itself, not other certs.
    basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
    now = datetime.utcnow()
    cert = (
        x509.CertificateBuilder()
        .subject_name(name)
        .issuer_name(name)
        .public_key(key.public_key())
        .serial_number(1000)
        .not_valid_before(now)
        .not_valid_after(now + timedelta(days=10 * 365))
        .add_extension(basic_contraints, False)
        .sign(key, hashes.SHA256(), default_backend())
    )
    cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
    key_pem = key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption(),
    )

    return cert_pem.decode(), key_pem.decode() 
示例17
def generate_cert(service, namespace, private_key):
    name = x509.Name([
        x509.NameAttribute(x509.oid.NameOID.COMMON_NAME,
                           "{0}.{1}.svc".format(service, namespace))
    ])

    subj_alternative_names = x509.SubjectAlternativeName([
        x509.DNSName("{0}".format(service)),
        x509.DNSName("{0}.{1}".format(service, namespace)),
        x509.DNSName("{0}.{1}.svc".format(service, namespace)),
    ])

    constraints = x509.BasicConstraints(ca=True, path_length=None)

    now = datetime.now()

    cert_builder = x509.CertificateBuilder()
    cert_builder = (cert_builder.subject_name(name)
                                .issuer_name(name)
                                .add_extension(subj_alternative_names, False)
                                .add_extension(constraints, False)
                                .not_valid_before(now)
                                .not_valid_after(now + timedelta(days=36500))
                                .public_key(private_key.public_key())
                                .serial_number(x509.random_serial_number()))

    cert = cert_builder.sign(private_key, hashes.SHA256(), default_backend())
    return cert 
示例18
def build_dep_token_certificate(dep_token):
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )
    public_key = private_key.public_key()
    builder = x509.CertificateBuilder()
    builder = builder.subject_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u'zentral-dep-token-{}'.format(dep_token.pk)),
    ]))
    builder = builder.issuer_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u'zentral'),
    ]))
    now = timezone.now()
    one_day = datetime.timedelta(days=1)
    builder = builder.not_valid_before(now - one_day)
    builder = builder.not_valid_after(now + 2 * one_day)
    builder = builder.serial_number(x509.random_serial_number())
    builder = builder.public_key(public_key)
    builder = builder.add_extension(
        x509.BasicConstraints(ca=False, path_length=None), critical=True,
    )
    certificate = builder.sign(
        private_key=private_key, algorithm=hashes.SHA256(),
        backend=default_backend()
    )
    return certificate, private_key 
示例19
def create(cls, common_name: str = 'COMMANDMENT-CA', key_size=2048):
        ca = cls()
        ca.common_name = common_name
        name = x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, common_name),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, 'commandment')
        ])

        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=key_size,
            backend=default_backend(),
        )
        ca.rsa_private_key = RSAPrivateKey.from_crypto(private_key)
        db.session.add(ca.rsa_private_key)

        certificate = x509.CertificateBuilder().subject_name(
            name
        ).issuer_name(
            name
        ).public_key(
            private_key.public_key()
        ).serial_number(
            x509.random_serial_number()
        ).not_valid_before(
            datetime.datetime.utcnow()
        ).not_valid_after(
            datetime.datetime.utcnow() + datetime.timedelta(days=365)
        ).add_extension(
            x509.BasicConstraints(ca=True, path_length=None), True
        ).sign(private_key, hashes.SHA256(), default_backend())

        ca_certificate_model = CACertificate.from_crypto(certificate)
        ca_certificate_model.rsa_private_key = ca.rsa_private_key
        ca.certificate = ca_certificate_model

        db.session.add(ca)
        db.session.commit()

        return ca 
示例20
def generate_self_signed_certificate(cn: str) -> (rsa.RSAPrivateKey, x509.Certificate):
    """Generate an X.509 Certificate with the given Common Name.
    
    Args:
          cn (string): 
    """
    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, cn),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, 'commandment')
    ])

    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend(),
    )

    certificate = x509.CertificateBuilder().subject_name(
        name
    ).issuer_name(
        name
    ).public_key(
        private_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).not_valid_before(
        datetime.datetime.utcnow()
    ).not_valid_after(
        datetime.datetime.utcnow() + datetime.timedelta(days=365)
    ).add_extension(
        x509.SubjectAlternativeName([
            DNSName(cn)
        ]), False
    ).sign(private_key, hashes.SHA256(), default_backend())

    return private_key, certificate 
示例21
def get_cert_builder(expires, serial=None):
    """Get a basic X509 cert builder object.

    .. TODO:: deprecate support for passing datetime as expires

    Parameters
    ----------

    expires : datetime or timedelta
        When this certificate will expire.
    serial : int, optional
        Serial number to set for this certificate. Use :py:func:`~cg:cryptography.x509.random_serial_number`
        to generate such a value. By default, a value will be generated.
    """
    now = datetime.utcnow().replace(second=0, microsecond=0)

    if serial is None:
        serial = x509.random_serial_number()
    if expires is None:
        expires = now + ca_settings.CA_DEFAULT_EXPIRES
    elif isinstance(expires, timedelta):
        expires = now + expires
    else:
        expires = expires.replace(second=0, microsecond=0)

    builder = x509.CertificateBuilder()
    builder = builder.not_valid_before(now)
    builder = builder.not_valid_after(expires)
    builder = builder.serial_number(serial)

    return builder 
示例22
def new_credentials(cls):
        """Create a new Security object with a new certificate/key pair."""
        from cryptography import x509
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives import hashes
        from cryptography.hazmat.primitives import serialization
        from cryptography.hazmat.primitives.asymmetric import rsa
        from cryptography.x509.oid import NameOID

        key = rsa.generate_private_key(public_exponent=65537,
                                       key_size=2048,
                                       backend=default_backend())
        key_bytes = key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption())

        subject = issuer = x509.Name(
            [x509.NameAttribute(NameOID.COMMON_NAME, u'skein-internal')])
        now = datetime.utcnow()
        cert = (x509.CertificateBuilder()
                    .subject_name(subject)
                    .issuer_name(issuer)
                    .public_key(key.public_key())
                    .serial_number(x509.random_serial_number())
                    .not_valid_before(now)
                    .not_valid_after(now + timedelta(days=365))
                    .sign(key, hashes.SHA256(), default_backend()))

        cert_bytes = cert.public_bytes(serialization.Encoding.PEM)

        return cls(cert_bytes=cert_bytes, key_bytes=key_bytes) 
示例23
def get_certificate_and_private_key(self):
        private_key = rsa.generate_private_key(public_exponent=3,
                                               key_size=1024,
                                               backend=default_backend())
        issuer = x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, u"FI"),
            x509.NameAttribute(NameOID.LOCALITY_NAME, u"Helsinki"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Some Company"),
            x509.NameAttribute(NameOID.COMMON_NAME, u"Test Certificate"),
        ])
        cert_builder = x509.CertificateBuilder(
            issuer_name=issuer, subject_name=issuer,
            public_key=private_key.public_key(),
            serial_number=x509.random_serial_number(),
            not_valid_before=datetime.utcnow(),
            not_valid_after=datetime.utcnow() + timedelta(days=10)
        )
        cert = cert_builder.sign(private_key,
                                 hashes.SHA256(),
                                 default_backend())
        cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
        key_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption())
        return cert_pem, key_pem 
示例24
def generate_self_signed_certificate(self, key, cn, validity, san=None):
        """
        Using the provided rsa key, a string common name, a validity (in number
        of days), and a list of subject alternative names (as strings), generate
        and return a signed certificate object.
        """
        _validity = min(validity, self.settings['max_validity_days'])
        subject = self.generate_x509_name(cn)
        issuer = subject
        # x509.CertificateBuilder functions return modified versions of the
        # object, so it's weirdly meant to be chained as function calls, making
        # this look weirdly javascript-like.
        cert = x509.CertificateBuilder(
        ).subject_name(
            subject,
        ).issuer_name(
            issuer,
        ).public_key(
            key.public_key(),
        ).serial_number(
            x509.random_serial_number(),
        ).not_valid_before(
            datetime.datetime.utcnow(),
        ).not_valid_after(
            datetime.datetime.utcnow() + datetime.timedelta(days=_validity),
        )
        if san:
            dns_names = self.encode_san_dns_names(san)
            cert = cert.add_extension(
                x509.SubjectAlternativeName(dns_names),
                critical=False,
            )
        return cert.sign(key, hashes.SHA256(), default_backend()) 
示例25
def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''):
    if not issuer_name:
        issuer_name = subject_name

    # DDS-Security section 9.3.1 calls for prime256v1, for which SECP256R1 is an alias
    private_key = ec.generate_private_key(ec.SECP256R1, cryptography_backend())
    if not ca_key:
        ca_key = private_key

    if ca:
        extension = x509.BasicConstraints(ca=True, path_length=1)
    else:
        extension = x509.BasicConstraints(ca=False, path_length=None)

    utcnow = datetime.datetime.utcnow()
    builder = x509.CertificateBuilder(
        ).issuer_name(
            issuer_name
        ).serial_number(
            x509.random_serial_number()
        ).not_valid_before(
            # Using a day earlier here to prevent Connext (5.3.1) from complaining
            # when extracting it from the permissions file and thinking it's in the future
            # https://github.com/ros2/ci/pull/436#issuecomment-624874296
            utcnow - datetime.timedelta(days=1)
        ).not_valid_after(
            # TODO: This should not be hard-coded
            utcnow + datetime.timedelta(days=3650)
        ).public_key(
            private_key.public_key()
        ).subject_name(
            subject_name
        ).add_extension(
            extension, critical=ca
        )
    cert = builder.sign(ca_key, hashes.SHA256(), cryptography_backend())

    return (cert, private_key) 
示例26
def generate_key_and_cert():
    signing_key = rsa.generate_private_key(
        backend=crypto_default_backend(),
        public_exponent=65537,
        key_size=2048
    )
    subject = issuer = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"),
        x509.NameAttribute(NameOID.COMMON_NAME, u"example.com"),
    ])
    signing_cert = x509.CertificateBuilder().subject_name(
        subject
    ).issuer_name(
        issuer
    ).public_key(
        signing_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).not_valid_before(
        datetime.utcnow()
    ).not_valid_after(
        # Our certificate will be valid for 10 days
        datetime.utcnow() + timedelta(days=10)
        # Sign our certificate with our private key
    ).sign(
        signing_key, hashes.SHA256(), crypto_default_backend()
    ).public_bytes(crypto_serialization.Encoding.DER)
    return signing_key, signing_cert 
示例27
def ca_file(tmpdir):
    """
    Create a valid PEM file with CA certificates and return the path.
    """
    key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )
    public_key = key.public_key()

    builder = x509.CertificateBuilder()
    builder = builder.subject_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
    ]))
    builder = builder.issuer_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
    ]))
    one_day = datetime.timedelta(1, 0, 0)
    builder = builder.not_valid_before(datetime.datetime.today() - one_day)
    builder = builder.not_valid_after(datetime.datetime.today() + one_day)
    builder = builder.serial_number(int(uuid.uuid4()))
    builder = builder.public_key(public_key)
    builder = builder.add_extension(
        x509.BasicConstraints(ca=True, path_length=None), critical=True,
    )

    certificate = builder.sign(
        private_key=key, algorithm=hashes.SHA256(),
        backend=default_backend()
    )

    ca_file = tmpdir.join("test.pem")
    ca_file.write_binary(
        certificate.public_bytes(
            encoding=serialization.Encoding.PEM,
        )
    )

    return str(ca_file).encode("ascii") 
示例28
def generate_cert(path):
    key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )

    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u"lxd-image-server.localhost")])

    now = datetime.utcnow()
    basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
    ip_address = get_address()
    cert = (
        x509.CertificateBuilder()
        .subject_name(name)
        .issuer_name(name)
        .public_key(key.public_key())
        .serial_number(1000)
        .not_valid_before(now - timedelta(days=10))
        .not_valid_after(now + timedelta(days=10 * 365))
        .add_extension(basic_contraints, False)
        .add_extension(
            x509.SubjectAlternativeName([
                x509.DNSName(u"lxd.localhost")]),
            critical=False,)
        .sign(key, hashes.SHA256(), default_backend()))

    with open(join(path, KEY_FILE), 'wb') as f:
        f.write(key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption(),))

    with open(join(path, CERT_FILE), 'wb') as f:
        f.write(cert.public_bytes(serialization.Encoding.PEM)) 
示例29
def create_certificate(self):
        self.private_key = self.generate_private_key()

        subject = issuer = x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"WA"),
            x509.NameAttribute(NameOID.LOCALITY_NAME, u"Seattle"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Amazon Alexa"),
            x509.NameAttribute(
                NameOID.COMMON_NAME, u"{}".format(self.PREPOPULATED_CERT_URL)),
            ])

        self.mock_certificate = x509.CertificateBuilder().subject_name(
            name=subject).issuer_name(
            name=issuer).public_key(
            key=self.private_key.public_key()).serial_number(
            number=x509.random_serial_number()).not_valid_before(
            time=datetime.utcnow() - timedelta(minutes=1)).not_valid_after(
            time=datetime.utcnow() + timedelta(minutes=1)).add_extension(
            extension=x509.SubjectAlternativeName(
                [x509.DNSName(u"{}".format(CERT_CHAIN_DOMAIN))]),
            critical=False).sign(
            private_key=self.private_key,
            algorithm=SHA1(),
            backend=default_backend()
        )

        self.request_verifier._cert_cache[
            self.PREPOPULATED_CERT_URL] = self.mock_certificate 
示例30
def generate_admin_crt(config, host):
    private_key = generate_key()
    public_key = private_key.public_key()
    one_day = datetime.timedelta(1, 0, 0)
    ca_cert = config.get("CA_CERT", None)
    ca_key = config.get("CA_KEY", None)
    cert_expiration = config.get("CERT_ADMIN_EXPIRE", 1825)
    if not ca_cert or not ca_key:
        raise Exception('CA_CERT or CA_KEY not defined')
    ca_key = serialization.load_pem_private_key(str(ca_key), password=None, backend=default_backend())
    ca_cert = x509.load_pem_x509_certificate(str(ca_cert), backend=default_backend())
    builder = x509.CertificateBuilder()
    builder = builder.subject_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, host),
    ]))
    builder = builder.issuer_name(ca_cert.subject)
    builder = builder.not_valid_before(datetime.datetime.today() - one_day)
    builder = builder.not_valid_after(datetime.datetime.today() + datetime.timedelta(days=cert_expiration))
    builder = builder.serial_number(x509.random_serial_number())
    builder = builder.public_key(public_key)
    builder = builder.add_extension(
        x509.SubjectAlternativeName(
            [x509.IPAddress(ipaddress.IPv4Address(host))]
        ),
        critical=False
    )
    builder = builder.add_extension(
        x509.BasicConstraints(ca=False, path_length=None), critical=True,
    )
    certificate = builder.sign(
        private_key=ca_key, algorithm=hashes.SHA256(),
        backend=default_backend()
    )
    private_key = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption(),
        )
    certificate = certificate.public_bytes(serialization.Encoding.PEM)
    return private_key, certificate