Python源码示例:cryptography.x509.CertificateBuilder()
示例1
def create_cert_builder(subject, issuer_name, public_key, days=365, is_ca=False):
"""
The method to create a builder for all types of certificates.
:param subject: The subject of the certificate.
:param issuer_name: The name of the issuer.
:param public_key: The public key of the certificate.
:param days: The number of days for which the certificate is valid. The default is 1 year or 365 days.
:param is_ca: Boolean to indicate if a cert is ca or non ca.
:return: The certificate builder.
:rtype: :class `x509.CertificateBuilder`
"""
builder = x509.CertificateBuilder()
builder = builder.subject_name(subject)
builder = builder.issuer_name(issuer_name)
builder = builder.public_key(public_key)
builder = builder.not_valid_before(datetime.today())
builder = builder.not_valid_after(datetime.today() + timedelta(days=days))
builder = builder.serial_number(int(uuid.uuid4()))
builder = builder.add_extension(
x509.BasicConstraints(ca=is_ca, path_length=None), critical=True
)
return builder
示例2
def create_self_signed_certificate(subject_name, private_key, days_valid=365):
subject = x509.Name([
x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Test, Inc."),
x509.NameAttribute(x509.NameOID.COMMON_NAME, subject_name)
])
certificate = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
subject
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).add_extension(
x509.BasicConstraints(ca=True, path_length=None), critical=True
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=days_valid)
).sign(private_key, hashes.SHA256(), backends.default_backend())
return certificate
示例3
def certificate(private_key: rsa.RSAPrivateKey) -> x509.Certificate:
b = x509.CertificateBuilder()
name = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Commandment"),
x509.NameAttribute(NameOID.COMMON_NAME, u"CA-CERTIFICATE"),
])
cer = b.subject_name(name).issuer_name(name).public_key(
private_key.public_key()
).serial_number(1).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=10)
).add_extension(
x509.BasicConstraints(ca=False, path_length=None), True
).sign(private_key, hashes.SHA256(), default_backend())
return cer
示例4
def ca_certificate(private_key: rsa.RSAPrivateKey) -> x509.Certificate:
b = x509.CertificateBuilder()
name = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Commandment"),
x509.NameAttribute(NameOID.COMMON_NAME, u"CA-CERTIFICATE"),
])
cert = b.serial_number(1).issuer_name(
name
).subject_name(
name
).public_key(
private_key.public_key()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=10)
).add_extension(
x509.BasicConstraints(ca=True, path_length=None), True
).sign(private_key, hashes.SHA256(), default_backend())
return cert
示例5
def sign_cert_builder(cert_builder, private_key, alg=None):
"""
Create certificate from CertificateBuilder and sign with provided key and
algorithm.
Args:
cert_builder (x509.CertificateBuilder): Certificate configuration that
should be signed.
Return:
x509.Certificate
"""
alg = alg if alg else hashes.SHA256()
return cert_builder.sign(
private_key=private_key,
algorithm=alg,
backend=cryptography_default_backend
)
示例6
def sign_cert_builder(cert_builder, private_key, alg=None):
"""
Create certificate from CertificateBuilder and sign with provided key and
algorithm.
Args:
cert_builder (x509.CertificateBuilder): Certificate configuration that
should be signed.
Return:
x509.Certificate
"""
alg = alg if alg else hashes.SHA256()
return cert_builder.sign(
private_key=private_key,
algorithm=alg,
backend=cryptography_default_backend
)
示例7
def setUp(self):
super(TestLocalGenerator, self).setUp()
self.signing_digest = "sha256"
# Setup CA data
ca_cert = x509.CertificateBuilder()
valid_from_datetime = datetime.datetime.utcnow()
valid_until_datetime = (datetime.datetime.utcnow() +
datetime.timedelta(
seconds=2 * 365 * 24 * 60 * 60))
ca_cert = ca_cert.not_valid_before(valid_from_datetime)
ca_cert = ca_cert.not_valid_after(valid_until_datetime)
ca_cert = ca_cert.serial_number(1)
subject_name = x509.Name([
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME,
u"Oregon"),
x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, u"Springfield"),
x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME,
u"Springfield Nuclear Power Plant"),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"maggie1"),
])
ca_cert = ca_cert.subject_name(subject_name)
ca_cert = ca_cert.issuer_name(subject_name)
ca_cert = ca_cert.public_key(self.ca_key.public_key())
signed_cert = ca_cert.sign(private_key=self.ca_key,
algorithm=hashes.SHA256(),
backend=backends.default_backend())
self.ca_certificate = signed_cert.public_bytes(
encoding=serialization.Encoding.PEM)
self.cert_generator = local_cert_gen.LocalCertGenerator
示例8
def certificate_template(
subject: x509.name.Name,
issuer: x509.name.Name,
public_key: x509.name.Name,
certauthority: bool = False,
) -> x509.base.CertificateBuilder:
if certauthority:
not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10)
else: # shorter valid length for on-the-fly certificates
not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=7)
return (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(not_valid_after)
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("localhost")]), critical=True
)
.add_extension(
x509.BasicConstraints(ca=certauthority, path_length=None), critical=True
)
)
示例9
def test_pfx(_autorestart, _autocmd, _fix_permissions, fake_env, fake_config):
archive_path = fake_env["archive"]
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
with open(archive_path / "privkey.pem", "wb") as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
subject = issuer = x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, u"example.com")]
)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=10))
.sign(key, hashes.SHA256(), default_backend())
)
with open(archive_path / "cert.pem", "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
with open(archive_path / "chain.pem", "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
hooks.deploy(config.load(fake_config), LINEAGE)
assert os.path.exists(archive_path / "cert.pfx")
assert os.stat(archive_path / "cert.pfx").st_size != 0
示例10
def issue_certificate(self, ca_pem_key, ca_pem_cert, csr_pem):
ca_cert = self.load_pem(entity_type="certificate", pem_text=ca_pem_cert)
ca_key = self.load_pem(entity_type="key", pem_text=ca_pem_key)
csr = self.load_pem(entity_type="csr", pem_text=csr_pem)
builder = x509.CertificateBuilder(
issuer_name=ca_cert.issuer,
subject_name=csr.subject,
public_key=csr.public_key(),
not_valid_before=self.not_valid_before_date,
not_valid_after=self.not_valid_after_date,
extensions=csr.extensions,
serial_number=self.serial_num,
)
certificate = builder.sign(
private_key=ca_key,
algorithm=hashes.SHA256(),
backend=default_backend(),
)
self.pem_certificate = certificate.public_bytes(
encoding=serialization.Encoding.PEM
)
return certificate
示例11
def generate_tls_sni_01_cert(server_name, key_type=u'rsa',
_generate_private_key=None):
"""
Generate a certificate/key pair for responding to a tls-sni-01 challenge.
:param str server_name: The SAN the certificate should have.
:param str key_type: The type of key to generate; usually not necessary.
:rtype: ``Tuple[`~cryptography.x509.Certificate`, PrivateKey]``
:return: A tuple of the certificate and private key.
"""
key = (_generate_private_key or generate_private_key)(key_type)
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'acme.invalid')])
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.not_valid_before(datetime.now() - timedelta(seconds=3600))
.not_valid_after(datetime.now() + timedelta(seconds=3600))
.serial_number(int(uuid.uuid4()))
.public_key(key.public_key())
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(server_name)]),
critical=False)
.sign(
private_key=key,
algorithm=hashes.SHA256(),
backend=default_backend())
)
return (cert, key)
示例12
def _generate_ca_cert(self):
"""
Generate a CA cert/key.
"""
if self._ca_key is None:
self._ca_key = generate_private_key(u'rsa')
self._ca_name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'ACME Snake Oil CA')])
self._ca_cert = (
x509.CertificateBuilder()
.subject_name(self._ca_name)
.issuer_name(self._ca_name)
.not_valid_before(self._now() - timedelta(seconds=3600))
.not_valid_after(self._now() + timedelta(days=3650))
.public_key(self._ca_key.public_key())
.serial_number(int(uuid4()))
.add_extension(
x509.BasicConstraints(ca=True, path_length=0),
critical=True)
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(
self._ca_key.public_key()),
critical=False)
.sign(
private_key=self._ca_key,
algorithm=hashes.SHA256(),
backend=default_backend()))
self._ca_aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(
self._ca_key.public_key())
示例13
def request_issuance(self, csr):
csr = csr.csr
# TODO: Only in Cryptography 1.3
# assert csr.is_signature_valid
cert = (
x509.CertificateBuilder()
.subject_name(csr.subject)
.issuer_name(self._ca_name)
.not_valid_before(self._now() - timedelta(seconds=3600))
.not_valid_after(self._now() + timedelta(days=90))
.serial_number(int(uuid4()))
.public_key(csr.public_key())
.add_extension(
csr.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value,
critical=False)
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(csr.public_key()),
critical=False)
.add_extension(self._ca_aki, critical=False)
.sign(
private_key=self._ca_key,
algorithm=hashes.SHA256(),
backend=default_backend()))
cert_res = messages.CertificateResource(
body=cert.public_bytes(encoding=serialization.Encoding.DER))
return self._controller.issue().addCallback(lambda _: cert_res)
示例14
def cert_builder(private_key):
return (
x509.CertificateBuilder()
.subject_name(
x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, "foo.com")])
)
.issuer_name(
x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, "foo.com")])
)
.serial_number(1)
.public_key(private_key.public_key())
.not_valid_before(datetime.datetime(2017, 12, 22))
.not_valid_after(datetime.datetime(2040, 1, 1))
)
示例15
def create_certificate(subject_name,
private_key,
signing_certificate,
signing_key,
days_valid=365,
client_auth=False):
subject = x509.Name([
x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, u"Test, Inc."),
x509.NameAttribute(x509.NameOID.COMMON_NAME, subject_name)
])
builder = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
signing_certificate.subject
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=days_valid)
)
if client_auth:
builder = builder.add_extension(
x509.ExtendedKeyUsage([x509.ExtendedKeyUsageOID.CLIENT_AUTH]),
critical=True
)
certificate = builder.sign(
signing_key,
hashes.SHA256(),
backends.default_backend()
)
return certificate
示例16
def generate_selfsigned_cert(hostname="/CN=Chia Blockchain CA", key=None):
"""Generates self signed certificate for a hostname, and optional IP addresses."""
# Generate our key
if key is None:
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend(),
)
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
# path_len=0 means this cert can only sign itself, not other certs.
basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
now = datetime.utcnow()
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.public_key(key.public_key())
.serial_number(1000)
.not_valid_before(now)
.not_valid_after(now + timedelta(days=10 * 365))
.add_extension(basic_contraints, False)
.sign(key, hashes.SHA256(), default_backend())
)
cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
key_pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
return cert_pem.decode(), key_pem.decode()
示例17
def generate_cert(service, namespace, private_key):
name = x509.Name([
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME,
"{0}.{1}.svc".format(service, namespace))
])
subj_alternative_names = x509.SubjectAlternativeName([
x509.DNSName("{0}".format(service)),
x509.DNSName("{0}.{1}".format(service, namespace)),
x509.DNSName("{0}.{1}.svc".format(service, namespace)),
])
constraints = x509.BasicConstraints(ca=True, path_length=None)
now = datetime.now()
cert_builder = x509.CertificateBuilder()
cert_builder = (cert_builder.subject_name(name)
.issuer_name(name)
.add_extension(subj_alternative_names, False)
.add_extension(constraints, False)
.not_valid_before(now)
.not_valid_after(now + timedelta(days=36500))
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number()))
cert = cert_builder.sign(private_key, hashes.SHA256(), default_backend())
return cert
示例18
def build_dep_token_certificate(dep_token):
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'zentral-dep-token-{}'.format(dep_token.pk)),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'zentral'),
]))
now = timezone.now()
one_day = datetime.timedelta(days=1)
builder = builder.not_valid_before(now - one_day)
builder = builder.not_valid_after(now + 2 * one_day)
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(public_key)
builder = builder.add_extension(
x509.BasicConstraints(ca=False, path_length=None), critical=True,
)
certificate = builder.sign(
private_key=private_key, algorithm=hashes.SHA256(),
backend=default_backend()
)
return certificate, private_key
示例19
def create(cls, common_name: str = 'COMMANDMENT-CA', key_size=2048):
ca = cls()
ca.common_name = common_name
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, 'commandment')
])
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend(),
)
ca.rsa_private_key = RSAPrivateKey.from_crypto(private_key)
db.session.add(ca.rsa_private_key)
certificate = x509.CertificateBuilder().subject_name(
name
).issuer_name(
name
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365)
).add_extension(
x509.BasicConstraints(ca=True, path_length=None), True
).sign(private_key, hashes.SHA256(), default_backend())
ca_certificate_model = CACertificate.from_crypto(certificate)
ca_certificate_model.rsa_private_key = ca.rsa_private_key
ca.certificate = ca_certificate_model
db.session.add(ca)
db.session.commit()
return ca
示例20
def generate_self_signed_certificate(cn: str) -> (rsa.RSAPrivateKey, x509.Certificate):
"""Generate an X.509 Certificate with the given Common Name.
Args:
cn (string):
"""
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, cn),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, 'commandment')
])
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend(),
)
certificate = x509.CertificateBuilder().subject_name(
name
).issuer_name(
name
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([
DNSName(cn)
]), False
).sign(private_key, hashes.SHA256(), default_backend())
return private_key, certificate
示例21
def get_cert_builder(expires, serial=None):
"""Get a basic X509 cert builder object.
.. TODO:: deprecate support for passing datetime as expires
Parameters
----------
expires : datetime or timedelta
When this certificate will expire.
serial : int, optional
Serial number to set for this certificate. Use :py:func:`~cg:cryptography.x509.random_serial_number`
to generate such a value. By default, a value will be generated.
"""
now = datetime.utcnow().replace(second=0, microsecond=0)
if serial is None:
serial = x509.random_serial_number()
if expires is None:
expires = now + ca_settings.CA_DEFAULT_EXPIRES
elif isinstance(expires, timedelta):
expires = now + expires
else:
expires = expires.replace(second=0, microsecond=0)
builder = x509.CertificateBuilder()
builder = builder.not_valid_before(now)
builder = builder.not_valid_after(expires)
builder = builder.serial_number(serial)
return builder
示例22
def new_credentials(cls):
"""Create a new Security object with a new certificate/key pair."""
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
key = rsa.generate_private_key(public_exponent=65537,
key_size=2048,
backend=default_backend())
key_bytes = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
subject = issuer = x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, u'skein-internal')])
now = datetime.utcnow()
cert = (x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + timedelta(days=365))
.sign(key, hashes.SHA256(), default_backend()))
cert_bytes = cert.public_bytes(serialization.Encoding.PEM)
return cls(cert_bytes=cert_bytes, key_bytes=key_bytes)
示例23
def get_certificate_and_private_key(self):
private_key = rsa.generate_private_key(public_exponent=3,
key_size=1024,
backend=default_backend())
issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"FI"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Helsinki"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Some Company"),
x509.NameAttribute(NameOID.COMMON_NAME, u"Test Certificate"),
])
cert_builder = x509.CertificateBuilder(
issuer_name=issuer, subject_name=issuer,
public_key=private_key.public_key(),
serial_number=x509.random_serial_number(),
not_valid_before=datetime.utcnow(),
not_valid_after=datetime.utcnow() + timedelta(days=10)
)
cert = cert_builder.sign(private_key,
hashes.SHA256(),
default_backend())
cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return cert_pem, key_pem
示例24
def generate_self_signed_certificate(self, key, cn, validity, san=None):
"""
Using the provided rsa key, a string common name, a validity (in number
of days), and a list of subject alternative names (as strings), generate
and return a signed certificate object.
"""
_validity = min(validity, self.settings['max_validity_days'])
subject = self.generate_x509_name(cn)
issuer = subject
# x509.CertificateBuilder functions return modified versions of the
# object, so it's weirdly meant to be chained as function calls, making
# this look weirdly javascript-like.
cert = x509.CertificateBuilder(
).subject_name(
subject,
).issuer_name(
issuer,
).public_key(
key.public_key(),
).serial_number(
x509.random_serial_number(),
).not_valid_before(
datetime.datetime.utcnow(),
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=_validity),
)
if san:
dns_names = self.encode_san_dns_names(san)
cert = cert.add_extension(
x509.SubjectAlternativeName(dns_names),
critical=False,
)
return cert.sign(key, hashes.SHA256(), default_backend())
示例25
def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''):
if not issuer_name:
issuer_name = subject_name
# DDS-Security section 9.3.1 calls for prime256v1, for which SECP256R1 is an alias
private_key = ec.generate_private_key(ec.SECP256R1, cryptography_backend())
if not ca_key:
ca_key = private_key
if ca:
extension = x509.BasicConstraints(ca=True, path_length=1)
else:
extension = x509.BasicConstraints(ca=False, path_length=None)
utcnow = datetime.datetime.utcnow()
builder = x509.CertificateBuilder(
).issuer_name(
issuer_name
).serial_number(
x509.random_serial_number()
).not_valid_before(
# Using a day earlier here to prevent Connext (5.3.1) from complaining
# when extracting it from the permissions file and thinking it's in the future
# https://github.com/ros2/ci/pull/436#issuecomment-624874296
utcnow - datetime.timedelta(days=1)
).not_valid_after(
# TODO: This should not be hard-coded
utcnow + datetime.timedelta(days=3650)
).public_key(
private_key.public_key()
).subject_name(
subject_name
).add_extension(
extension, critical=ca
)
cert = builder.sign(ca_key, hashes.SHA256(), cryptography_backend())
return (cert, private_key)
示例26
def generate_key_and_cert():
signing_key = rsa.generate_private_key(
backend=crypto_default_backend(),
public_exponent=65537,
key_size=2048
)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"),
x509.NameAttribute(NameOID.COMMON_NAME, u"example.com"),
])
signing_cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
signing_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
# Our certificate will be valid for 10 days
datetime.utcnow() + timedelta(days=10)
# Sign our certificate with our private key
).sign(
signing_key, hashes.SHA256(), crypto_default_backend()
).public_bytes(crypto_serialization.Encoding.DER)
return signing_key, signing_cert
示例27
def ca_file(tmpdir):
"""
Create a valid PEM file with CA certificates and return the path.
"""
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
]))
one_day = datetime.timedelta(1, 0, 0)
builder = builder.not_valid_before(datetime.datetime.today() - one_day)
builder = builder.not_valid_after(datetime.datetime.today() + one_day)
builder = builder.serial_number(int(uuid.uuid4()))
builder = builder.public_key(public_key)
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None), critical=True,
)
certificate = builder.sign(
private_key=key, algorithm=hashes.SHA256(),
backend=default_backend()
)
ca_file = tmpdir.join("test.pem")
ca_file.write_binary(
certificate.public_bytes(
encoding=serialization.Encoding.PEM,
)
)
return str(ca_file).encode("ascii")
示例28
def generate_cert(path):
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"lxd-image-server.localhost")])
now = datetime.utcnow()
basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
ip_address = get_address()
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.public_key(key.public_key())
.serial_number(1000)
.not_valid_before(now - timedelta(days=10))
.not_valid_after(now + timedelta(days=10 * 365))
.add_extension(basic_contraints, False)
.add_extension(
x509.SubjectAlternativeName([
x509.DNSName(u"lxd.localhost")]),
critical=False,)
.sign(key, hashes.SHA256(), default_backend()))
with open(join(path, KEY_FILE), 'wb') as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),))
with open(join(path, CERT_FILE), 'wb') as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
示例29
def create_certificate(self):
self.private_key = self.generate_private_key()
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"WA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Seattle"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Amazon Alexa"),
x509.NameAttribute(
NameOID.COMMON_NAME, u"{}".format(self.PREPOPULATED_CERT_URL)),
])
self.mock_certificate = x509.CertificateBuilder().subject_name(
name=subject).issuer_name(
name=issuer).public_key(
key=self.private_key.public_key()).serial_number(
number=x509.random_serial_number()).not_valid_before(
time=datetime.utcnow() - timedelta(minutes=1)).not_valid_after(
time=datetime.utcnow() + timedelta(minutes=1)).add_extension(
extension=x509.SubjectAlternativeName(
[x509.DNSName(u"{}".format(CERT_CHAIN_DOMAIN))]),
critical=False).sign(
private_key=self.private_key,
algorithm=SHA1(),
backend=default_backend()
)
self.request_verifier._cert_cache[
self.PREPOPULATED_CERT_URL] = self.mock_certificate
示例30
def generate_admin_crt(config, host):
private_key = generate_key()
public_key = private_key.public_key()
one_day = datetime.timedelta(1, 0, 0)
ca_cert = config.get("CA_CERT", None)
ca_key = config.get("CA_KEY", None)
cert_expiration = config.get("CERT_ADMIN_EXPIRE", 1825)
if not ca_cert or not ca_key:
raise Exception('CA_CERT or CA_KEY not defined')
ca_key = serialization.load_pem_private_key(str(ca_key), password=None, backend=default_backend())
ca_cert = x509.load_pem_x509_certificate(str(ca_cert), backend=default_backend())
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, host),
]))
builder = builder.issuer_name(ca_cert.subject)
builder = builder.not_valid_before(datetime.datetime.today() - one_day)
builder = builder.not_valid_after(datetime.datetime.today() + datetime.timedelta(days=cert_expiration))
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(public_key)
builder = builder.add_extension(
x509.SubjectAlternativeName(
[x509.IPAddress(ipaddress.IPv4Address(host))]
),
critical=False
)
builder = builder.add_extension(
x509.BasicConstraints(ca=False, path_length=None), critical=True,
)
certificate = builder.sign(
private_key=ca_key, algorithm=hashes.SHA256(),
backend=default_backend()
)
private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
certificate = certificate.public_bytes(serialization.Encoding.PEM)
return private_key, certificate