Python源码示例:cryptography.x509.DNSName()
示例1
def build_csr(self, hostname, **kwargs):
realm = self.plugin.ipa.env.realm
builder = x509.CertificateSigningRequestBuilder()
builder = builder.subject_name(
x509.Name([
x509.NameAttribute(oid.NameOID.COMMON_NAME, hostname),
x509.NameAttribute(oid.NameOID.ORGANIZATION_NAME, realm),
])
)
build = builder.add_extension(
x509.BasicConstraints(ca=False, path_length=None), critical=True,
)
build = builder.add_extension(
x509.ExtendedKeyUsage([TLS_SERVERAUTH]), critical=True
)
builder = build.add_extension(
x509.SubjectAlternativeName([x509.DNSName(hostname)]),
critical=False
)
return builder
# pylint: disable=arguments-differ
示例2
def extract_dns_subject_alternative_names(certificate: x509.Certificate) -> List[str]:
"""Retrieve all the DNS entries of the Subject Alternative Name extension.
"""
subj_alt_names: List[str] = []
try:
san_ext = certificate.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
san_ext_value = cast(x509.SubjectAlternativeName, san_ext.value)
subj_alt_names = san_ext_value.get_values_for_type(DNSName)
except ExtensionNotFound:
pass
except DuplicateExtension:
# Fix for https://github.com/nabla-c0d3/sslyze/issues/420
# Not sure how browsers behave in this case but having a duplicate extension makes the certificate invalid
# so we just return no SANs (likely to make hostname validation fail, which is fine)
pass
return subj_alt_names
示例3
def create_csr(key, domains, must_staple=False):
"""
Creates a CSR in DER format for the specified key and domain names.
"""
assert domains
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
])
san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
.add_extension(san, critical=False)
if must_staple:
ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request])
csr = csr.add_extension(ocsp_must_staple, critical=False)
csr = csr.sign(key, hashes.SHA256(), default_backend())
return export_csr_for_acme(csr)
示例4
def match(self, value):
# This is somewhat terrible. Probably can be better after
# pyca/service_identity#14 is resolved.
target_ids = [
DNSPattern(target_name.encode('utf-8'))
for target_name
in (
value.extensions
.get_extension_for_oid(
ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
.value
.get_values_for_type(x509.DNSName)
)]
ids = [DNS_ID(self.name)]
try:
verify_service_identity(
cert_patterns=target_ids, obligatory_ids=ids, optional_ids=[])
except VerificationError:
return Mismatch(
'{!r} is not valid for {!r}'.format(value, self.name))
示例5
def test_certificate_input_with_extensions(client, authority):
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
"commonName": "test.example.com",
"owner": "jim@example.com",
"authority": {"id": authority.id},
"description": "testtestest",
"extensions": {
"keyUsage": {"digital_signature": True},
"extendedKeyUsage": {
"useClientAuthentication": True,
"useServerAuthentication": True,
},
"subjectKeyIdentifier": {"includeSKI": True},
"subAltNames": {
"names": [{"nameType": "DNSName", "value": "test.example.com"}]
},
},
"dnsProvider": None,
}
data, errors = CertificateInputSchema().load(input_data)
assert not errors
示例6
def test_certificate_allowed_names(client, authority, session, logged_in_user):
"""Test for allowed CN and SAN values."""
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
"commonName": "Names with spaces are not checked",
"owner": "jim@example.com",
"authority": {"id": authority.id},
"description": "testtestest",
"validityStart": "2020-01-01T00:00:00",
"validityEnd": "2020-01-01T00:00:01",
"extensions": {
"subAltNames": {
"names": [
{"nameType": "DNSName", "value": "allowed.example.com"},
{"nameType": "IPAddress", "value": "127.0.0.1"},
]
}
},
"dnsProvider": None,
}
data, errors = CertificateInputSchema().load(input_data)
assert not errors
示例7
def test_csr_disallowed_san(client, logged_in_user):
"""SAN name is disallowed by LEMUR_WHITELISTED_DOMAINS."""
from lemur.common import validators
request, pkey = create_csr(
common_name="CN with spaces isn't a domain and is thus allowed",
owner="joe@example.com",
key_type="RSA2048",
extensions={
"sub_alt_names": {
"names": x509.SubjectAlternativeName([x509.DNSName("evilhacker.org")])
}
},
)
with pytest.raises(ValidationError) as err:
validators.csr(request)
assert str(err.value).startswith(
"Domain evilhacker.org does not match whitelisted domain patterns"
)
示例8
def domains(cert):
"""
Attempts to get an domains listed in a certificate.
If 'subjectAltName' extension is not available we simply
return the common name.
:param cert:
:return: List of domains
"""
domains = []
try:
ext = cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_ALTERNATIVE_NAME)
entries = ext.value.get_values_for_type(x509.DNSName)
for entry in entries:
domains.append(entry)
except x509.ExtensionNotFound:
if current_app.config.get("LOG_SSL_SUBJ_ALT_NAME_ERRORS", True):
sentry.captureException()
except Exception as e:
sentry.captureException()
return domains
示例9
def sub_alt_type(alt_type):
"""
Determines if the specified subject alternate type is valid.
:param alt_type:
:return:
"""
valid_types = [
"DNSName",
"IPAddress",
"uniFormResourceIdentifier",
"directoryName",
"rfc822Name",
"registrationID",
"otherName",
"x400Address",
"EDIPartyName",
]
if alt_type.lower() not in [a_type.lower() for a_type in valid_types]:
raise ValidationError(
"Invalid SubAltName Type: {0} choose from {1}".format(
type, ",".join(valid_types)
)
)
示例10
def test_map_fields_with_validity_years(mock_current_app):
mock_current_app.config.get = Mock(side_effect=config_mock)
with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
mock_signature_hash.return_value = "sha256"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"validity_years": 2
}
expected = {
"certificate": {
"csr": CSR_STR,
"common_name": "example.com",
"dns_names": names,
"signature_hash": "sha256",
},
"organization": {"id": 111111},
"validity_years": 2,
}
assert expected == plugin.map_fields(options, CSR_STR)
示例11
def _dnsname_to_stdlib(name):
"""Converts a DNSName SubjectAlternativeName field to the form used by the standard library.
Cryptography produces a dNSName as a unicode string that was idna-decoded
from ASCII bytes. We need to idna-encode that string to get it back, and
then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
Notes:
This depends on the Python version's standard library.
"""
def idna_encode(name):
"""Borrowed wholesale from the Python Cryptography Project.
It turns out that we can't just safely call `idna.encode`: it can explode for
wildcard names. This avoids that problem.
"""
import idna
for prefix in ['*.', '.']:
if name.startswith(prefix):
name = name[len(prefix):]
return prefix.encode('ascii') + idna.encode(name)
return idna.encode(name)
name = idna_encode(name)
if sys.version_info >= (3, 0):
name = name.decode('utf-8')
return name
示例12
def read_cert(cert_dir):
with open(os.path.join(cert_dir, 'cert.pem'), 'rb') as cert_file:
pem_data = cert_file.read()
cert = x509.load_pem_x509_certificate(pem_data, default_backend())
common_name = [na.value for na in cert.subject if na.oid._dotted_string == "2.5.4.3"][0]
subject_alternative_names = [ext.value for ext in cert.extensions if ext.oid._dotted_string == "2.5.29.17"][0]
dns_names = subject_alternative_names.get_values_for_type(x509.DNSName)
ip_sans = [str(ip) for ip in subject_alternative_names.get_values_for_type(x509.IPAddress)]
return common_name, dns_names, ip_sans
示例13
def service_dns(service_name, namespace, domain):
return [
x509.DNSName(f'{service_name}.{namespace}.svc.{domain}'),
x509.DNSName(f'{service_name}.{namespace}.svc'),
x509.DNSName(f'{service_name}.{namespace}'),
x509.DNSName(f'{service_name}'),
]
示例14
def pod_dns(pod_ip, namespace, domain):
return [
x509.DNSName(f'{pod_ip.replace(".", "-")}.{namespace}.pod.{domain}'),
x509.DNSName(f'{pod_ip.replace(".", "-")}.{namespace}.pod'),
]
示例15
def headless_dns(hostname, subdomain, namespace, domain):
return [
x509.DNSName(f'{hostname}.{subdomain}.{namespace}.svc.{domain}'),
x509.DNSName(f'{hostname}.{subdomain}.{namespace}.svc'),
x509.DNSName(f'{hostname}.{subdomain}.{namespace}'),
x509.DNSName(f'{hostname}.{subdomain}'),
x509.DNSName(f'{hostname}'),
]
示例16
def _generate_csr(cls, cn, private_key, passphrase=None):
pk = serialization.load_pem_private_key(
data=private_key, password=passphrase,
backend=backends.default_backend())
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, cn),
])
)
csr = csr.add_extension(
x509.BasicConstraints(
ca=False,
path_length=None
),
critical=True
)
csr = csr.add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
data_encipherment=True,
key_agreement=True,
content_commitment=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False
),
critical=True
)
csr = csr.add_extension(
x509.SubjectAlternativeName([x509.DNSName(cn)]),
critical=False
)
signed_csr = csr.sign(
pk,
getattr(hashes, CONF.certificates.signing_digest.upper())(),
backends.default_backend())
return signed_csr.public_bytes(serialization.Encoding.PEM)
示例17
def get_host_names(certificate):
"""Extract the host names from the Pem encoded X509 certificate
:param certificate: A PEM encoded certificate
:returns: A dictionary containing the following keys:
['cn', 'dns_names']
where 'cn' is the CN from the SubjectName of the
certificate, and 'dns_names' is a list of dNSNames
(possibly empty) from the SubjectAltNames of the certificate.
"""
if isinstance(certificate, str):
certificate = certificate.encode('utf-8')
try:
cert = x509.load_pem_x509_certificate(certificate,
backends.default_backend())
cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0]
host_names = {
'cn': cn.value.lower(),
'dns_names': []
}
try:
ext = cert.extensions.get_extension_for_oid(
x509.OID_SUBJECT_ALTERNATIVE_NAME
)
host_names['dns_names'] = ext.value.get_values_for_type(
x509.DNSName)
except x509.ExtensionNotFound:
LOG.debug("%s extension not found",
x509.OID_SUBJECT_ALTERNATIVE_NAME)
return host_names
except Exception:
LOG.exception('Unreadable Certificate.')
raise exceptions.UnreadableCert
示例18
def certificate_template(
subject: x509.name.Name,
issuer: x509.name.Name,
public_key: x509.name.Name,
certauthority: bool = False,
) -> x509.base.CertificateBuilder:
if certauthority:
not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10)
else: # shorter valid length for on-the-fly certificates
not_valid_after = datetime.datetime.utcnow() + datetime.timedelta(days=7)
return (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(not_valid_after)
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("localhost")]), critical=True
)
.add_extension(
x509.BasicConstraints(ca=certauthority, path_length=None), critical=True
)
)
示例19
def subject_alt_name(self):
return x509.SubjectAlternativeName(
[x509.DNSName(text_type(addr)) for addr in self.ip_addr_list]
)
示例20
def get_certificate_domains(cert):
"""
Gets a list of all Subject Alternative Names in the specified certificate.
"""
for ext in cert.extensions:
ext = ext.value
if isinstance(ext, x509.SubjectAlternativeName):
return ext.get_values_for_type(x509.DNSName)
return []
示例21
def generate_tls_sni_01_cert(server_name, key_type=u'rsa',
_generate_private_key=None):
"""
Generate a certificate/key pair for responding to a tls-sni-01 challenge.
:param str server_name: The SAN the certificate should have.
:param str key_type: The type of key to generate; usually not necessary.
:rtype: ``Tuple[`~cryptography.x509.Certificate`, PrivateKey]``
:return: A tuple of the certificate and private key.
"""
key = (_generate_private_key or generate_private_key)(key_type)
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'acme.invalid')])
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.not_valid_before(datetime.now() - timedelta(seconds=3600))
.not_valid_after(datetime.now() + timedelta(seconds=3600))
.serial_number(int(uuid.uuid4()))
.public_key(key.public_key())
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(server_name)]),
critical=False)
.sign(
private_key=key,
algorithm=hashes.SHA256(),
backend=default_backend())
)
return (cert, key)
示例22
def csr_for_names(names, key):
"""
Generate a certificate signing request for the given names and private key.
.. seealso:: `acme.client.Client.request_issuance`
.. seealso:: `generate_private_key`
:param ``List[str]``: One or more names (subjectAltName) for which to
request a certificate.
:param key: A Cryptography private key object.
:rtype: `cryptography.x509.CertificateSigningRequest`
:return: The certificate request message.
"""
if len(names) == 0:
raise ValueError('Must have at least one name')
if len(names[0]) > 64:
common_name = u'san.too.long.invalid'
else:
common_name = names[0]
return (
x509.CertificateSigningRequestBuilder()
.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, common_name)]))
.add_extension(
x509.SubjectAlternativeName(list(map(x509.DNSName, names))),
critical=False)
.sign(key, hashes.SHA256(), default_backend()))
示例23
def test_get_certificate_primitives(certificate):
from lemur.certificates.service import get_certificate_primitives
names = [x509.DNSName(x.name) for x in certificate.domains]
with freeze_time(datetime.date(year=2016, month=10, day=30)):
primitives = get_certificate_primitives(certificate)
assert len(primitives) == 26
示例24
def test_certificate_input_schema_parse_csr(authority):
from lemur.certificates.schemas import CertificateInputSchema
test_san_dns = "foobar.com"
extensions = {
"sub_alt_names": {
"names": x509.SubjectAlternativeName([x509.DNSName(test_san_dns)])
}
}
csr, private_key = create_csr(
owner="joe@example.com",
common_name="ACommonName",
organization="test",
organizational_unit="Meters",
country="NL",
state="Noord-Holland",
location="Amsterdam",
key_type="RSA2048",
extensions=extensions,
)
input_data = {
"commonName": "test.example.com",
"owner": "jim@example.com",
"authority": {"id": authority.id},
"description": "testtestest",
"csr": csr,
"dnsProvider": None,
}
data, errors = CertificateInputSchema().load(input_data)
for san in data["extensions"]["sub_alt_names"]["names"]:
assert san.value == test_san_dns
assert not errors
示例25
def test_certificate_disallowed_names(client, authority, session, logged_in_user):
"""The CN and SAN are disallowed by LEMUR_WHITELISTED_DOMAINS."""
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
"commonName": "*.example.com",
"owner": "jim@example.com",
"authority": {"id": authority.id},
"description": "testtestest",
"validityStart": "2020-01-01T00:00:00",
"validityEnd": "2020-01-01T00:00:01",
"extensions": {
"subAltNames": {
"names": [
{"nameType": "DNSName", "value": "allowed.example.com"},
{"nameType": "DNSName", "value": "evilhacker.org"},
]
}
},
"dnsProvider": None,
}
data, errors = CertificateInputSchema().load(input_data)
assert errors["common_name"][0].startswith(
"Domain *.example.com does not match whitelisted domain patterns"
)
assert errors["extensions"]["sub_alt_names"]["names"][0].startswith(
"Domain evilhacker.org does not match whitelisted domain patterns"
)
示例26
def test_create_csr():
csr, private_key = create_csr(
owner="joe@example.com",
common_name="ACommonName",
organization="test",
organizational_unit="Meters",
country="US",
state="CA",
location="Here",
key_type="RSA2048",
)
assert csr
assert private_key
extensions = {
"sub_alt_names": {
"names": x509.SubjectAlternativeName([x509.DNSName("AnotherCommonName")])
}
}
csr, private_key = create_csr(
owner="joe@example.com",
common_name="ACommonName",
organization="test",
organizational_unit="Meters",
country="US",
state="CA",
location="Here",
extensions=extensions,
key_type="RSA2048",
)
assert csr
assert private_key
示例27
def _serialize(self, value, attr, obj):
general_names = []
name_type = None
if value:
for name in value._general_names:
value = name.value
if isinstance(name, x509.DNSName):
name_type = "DNSName"
elif isinstance(name, x509.IPAddress):
if isinstance(value, ipaddress.IPv4Network):
name_type = "IPNetwork"
else:
name_type = "IPAddress"
value = str(value)
elif isinstance(name, x509.UniformResourceIdentifier):
name_type = "uniformResourceIdentifier"
elif isinstance(name, x509.DirectoryName):
name_type = "directoryName"
elif isinstance(name, x509.RFC822Name):
name_type = "rfc822Name"
elif isinstance(name, x509.RegisteredID):
name_type = "registeredID"
value = value.dotted_string
else:
current_app.logger.warning(
"Unknown SubAltName type: {name}".format(name=name)
)
continue
general_names.append({"nameType": name_type, "value": value})
return general_names
示例28
def get_additional_names(options):
"""
Return a list of strings to be added to a SAN certificates.
:param options:
:return:
"""
names = []
# add SANs if present
if options.get("extensions"):
for san in options["extensions"]["sub_alt_names"]:
if isinstance(san, x509.DNSName):
names.append(san.value)
return names
示例29
def test_get_domains_multiple(self, mock_current_app):
options = {
"common_name": "test.netflix.net",
"extensions": {
"sub_alt_names": {"names": [DNSName("test2.netflix.net"), DNSName("test3.netflix.net")]}
},
}
result = self.acme.get_domains(options)
self.assertEqual(
result, [options["common_name"], "test2.netflix.net", "test3.netflix.net"]
)
示例30
def test_get_domains_san(self, mock_current_app):
options = {
"common_name": "test.netflix.net",
"extensions": {
"sub_alt_names": {"names": [DNSName("test.netflix.net"), DNSName("test2.netflix.net")]}
},
}
result = self.acme.get_domains(options)
self.assertEqual(
result, [options["common_name"], "test2.netflix.net"]
)