Python源码示例:Cryptodome.Random.new()
示例1
def encrypt_credential(raw):
"""
Encodes data
:param data: Data to be encoded
:type data: str
:returns: string -- Encoded data
"""
# pylint: disable=invalid-name,import-error
import base64
try: # The crypto package depends on the library installed (see Wiki)
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util import Padding
except ImportError:
from Cryptodome import Random
from Cryptodome.Cipher import AES
from Cryptodome.Util import Padding
raw = bytes(Padding.pad(data_to_pad=raw.encode('utf-8'), block_size=__BLOCK_SIZE__))
iv = Random.new().read(AES.block_size)
cipher = AES.new(get_crypt_key(), AES.MODE_CBC, iv)
return base64.b64encode(iv + cipher.encrypt(raw)).decode('utf-8')
示例2
def decrypt_credential(enc, secret=None):
"""
Decodes data
:param data: Data to be decoded
:type data: str
:returns: string -- Decoded data
"""
# pylint: disable=invalid-name,import-error
import base64
try: # The crypto package depends on the library installed (see Wiki)
from Crypto.Cipher import AES
from Crypto.Util import Padding
except ImportError:
from Cryptodome.Cipher import AES
from Cryptodome.Util import Padding
enc = base64.b64decode(enc)
iv = enc[:AES.block_size]
cipher = AES.new(secret or get_crypt_key(), AES.MODE_CBC, iv)
decoded = Padding.unpad(
padded_data=cipher.decrypt(enc[AES.block_size:]),
block_size=__BLOCK_SIZE__)
return decoded
示例3
def getRandomInteger(N, randfunc=None):
"""getRandomInteger(N:int, randfunc:callable):long
Return a random number with at most N bits.
If randfunc is omitted, then Random.new().read is used.
This function is for internal use only and may be renamed or removed in
the future.
"""
if randfunc is None:
_import_Random()
randfunc = Random.new().read
S = randfunc(N>>3)
odd_bits = N % 8
if odd_bits != 0:
char = ord(randfunc(1)) >> (8-odd_bits)
S = bchr(char) + S
value = bytes_to_long(S)
return value
示例4
def testEncrypt1(self):
# Verify encryption using all test vectors
for test in self._testData:
# Build the key
comps = [ int(rws(test[0][x]),16) for x in ('n','e') ]
key = RSA.construct(comps)
# RNG that takes its random numbers from a pool given
# at initialization
class randGen:
def __init__(self, data):
self.data = data
self.idx = 0
def __call__(self, N):
r = self.data[self.idx:N]
self.idx += N
return r
# The real test
cipher = PKCS.new(key, test[4], randfunc=randGen(t2b(test[3])))
ct = cipher.encrypt(t2b(test[1]))
self.assertEqual(ct, t2b(test[2]))
示例5
def testEncryptDecrypt2(self):
# Helper function to monitor what's requested from RNG
global asked
def localRng(N):
global asked
asked += N
return self.rng(N)
# Verify that OAEP is friendly to all hashes
for hashmod in (MD2,MD5,SHA1,SHA256,RIPEMD160):
# Verify that encrypt() asks for as many random bytes
# as the hash output size
asked = 0
pt = self.rng(40)
cipher = PKCS.new(self.key1024, hashmod, randfunc=localRng)
ct = cipher.encrypt(pt)
self.assertEqual(cipher.decrypt(ct), pt)
self.assertEqual(asked, hashmod.digest_size)
示例6
def testEncrypt1(self):
for test in self._testData:
# Build the key
key = RSA.importKey(test[0])
# RNG that takes its random numbers from a pool given
# at initialization
class randGen:
def __init__(self, data):
self.data = data
self.idx = 0
def __call__(self, N):
r = self.data[self.idx:self.idx+N]
self.idx += N
return r
# The real test
cipher = PKCS.new(key, randfunc=randGen(t2b(test[3])))
ct = cipher.encrypt(b(test[1]))
self.assertEqual(ct, t2b(test[2]))
示例7
def generate_iv() -> bytes:
return Random.new().read(AES.block_size)
示例8
def aes_gcm_encrypt_with_iv(plain_text: bytes, hdr: bytes, key: bytes, iv: bytes):
cipher = AES.new(key=key, mode=AES.MODE_GCM, nonce=iv)
cipher.update(hdr)
cipher_text, mac_tag = cipher.encrypt_and_digest(plain_text)
return mac_tag, cipher_text
示例9
def aes_gcm_decrypt_with_iv(cipher_text: bytes, hdr: bytes, mac_tag: bytes, key: bytes, iv: bytes) -> bytes:
cipher = AES.new(key=key, mode=AES.MODE_GCM, nonce=iv)
cipher.update(hdr)
try:
plain_text = cipher.decrypt_and_verify(cipher_text, mac_tag)
except ValueError:
plain_text = b""
except KeyError:
plain_text = b""
return plain_text
示例10
def aes_gcm_encrypt(plain_text: bytes, hdr: bytes, key: bytes):
cipher = AES.new(key=key, mode=AES.MODE_GCM)
cipher.update(hdr)
cipher_text, mac_tag = cipher.encrypt_and_digest(plain_text)
nonce = cipher.nonce
return nonce, mac_tag, cipher_text
示例11
def aes_gcm_decrypt(cipher_text: bytes, hdr: bytes, nonce: bytes, mac_tag: bytes, key: bytes):
cipher = AES.new(key=key, mode=AES.MODE_GCM, nonce=nonce)
cipher.update(hdr)
try:
plain_text = cipher.decrypt_and_verify(cipher_text, mac_tag)
except ValueError:
plain_text = b""
except KeyError:
plain_text = b""
return plain_text
示例12
def aes_ctr_decrypt(cipher_text: bytes, nonce: bytes, key: bytes):
cipher = AES.new(key=key, mode=AES.MODE_CTR, nonce=nonce)
plain_text = cipher.decrypt(cipher_text)
return plain_text
示例13
def aes_cbc_encrypt(plain_text: bytes, key: bytes, iv: bytes = b''):
if len(iv) == 0:
iv = AESHandler.generate_iv()
cipher = AES.new(key=key, mode=AES.MODE_CBC, iv=iv)
return cipher.IV, cipher.encrypt(pad(plain_text, AES.block_size))
示例14
def aes_cbc_decrypt(cipher_text: bytes, iv: bytes, key: bytes):
cipher = AES.new(key=key, mode=AES.MODE_CBC, iv=iv)
return unpad(cipher.decrypt(cipher_text), AES.block_size, style='pkcs7')
示例15
def test_aes_gcm_with_iv(self):
key = b'Sixteen byte key'
plain_text = b'Attack at dawn'
hdr = b'To your eyes only'
iv = Random.new().read(AES.block_size)
mac, cipher_text = AESHandler.aes_gcm_encrypt_with_iv(plain_text, hdr, key, iv)
decrypt_out = AESHandler.aes_gcm_decrypt_with_iv(cipher_text, hdr, mac, key, iv)
self.assertEqual(plain_text, decrypt_out)
示例16
def test_aes_gcm_with_iv_wrong_iv(self):
key = b'Sixteen byte key'
plain_text = b'Attack at dawn'
hdr = b'To your eyes only'
iv = Random.new().read(AES.block_size)
mac, cipher_text = AESHandler.aes_gcm_encrypt_with_iv(plain_text, hdr, key, iv)
iv = Random.new().read(AES.block_size)
decrypt_out = AESHandler.aes_gcm_decrypt_with_iv(cipher_text, hdr, mac, key, iv)
self.assertNotEqual(plain_text, decrypt_out)
示例17
def encrypt(key, raw):
assert type(raw) == bytes, "input data is bytes"
if isinstance(key, str):
key = b64decode(key.encode())
raw = pad(raw, AES.block_size)
iv = Random.new().read(AES.block_size)
cipher = AES.new(key, AES.MODE_CBC, iv)
return iv + cipher.encrypt(raw)
示例18
def decrypt(key, enc):
assert type(enc) == bytes, 'Encrypt data is bytes'
if isinstance(key, str):
key = b64decode(key.encode())
iv = enc[:AES.block_size]
cipher = AES.new(key, AES.MODE_CBC, iv)
raw = cipher.decrypt(enc[AES.block_size:])
raw = unpad(raw, AES.block_size)
if len(raw) == 0:
raise ValueError("AES decryption error, not correct key.")
return raw
示例19
def compute_cert_verify(self, message, sig=Sig_PKCS1_v1_5, digest=SHA256):
if self.asym_keystore.private is None:
raise RuntimeError("Cannot sign, missing private key. Did you install an ASYM keystore?")
return sig.new(self.asym_keystore.private).sign(digest.new(message))
示例20
def __generate_secrets(self):
if isinstance(self.client_ctx.sym_keystore, tlsk.EmptySymKeyStore):
self.client_ctx.sym_keystore = self.sec_params.client_keystore
if isinstance(self.server_ctx.sym_keystore, tlsk.EmptySymKeyStore):
self.server_ctx.sym_keystore = self.sec_params.server_keystore
self.master_secret = self.sec_params.master_secret
# Retrieve ciphers used for client/server encryption and decryption
factory = CryptoContextFactory(self)
self.client_ctx.crypto_ctx = factory.new(self.client_ctx)
self.server_ctx.crypto_ctx = factory.new(self.server_ctx)
示例21
def get_encrypted_pms(self, pms=None):
cleartext = pms or self.premaster_secret
public = self.server_ctx.asym_keystore.public
if public is not None:
self.encrypted_premaster_secret = PKCS1_v1_5.new(public).encrypt(cleartext)
else:
raise ValueError("Cannot calculate encrypted MS. No server certificate found in connection")
return self.encrypted_premaster_secret
示例22
def _derive_finished(self, secret, hash_):
return HMAC.new(secret, hash_, digestmod=self.prf.digest).digest()
示例23
def get_verify_data(self, data=None):
if self.negotiated.version >= tls.TLSVersion.TLS_1_3:
if self.client:
prf_verify_data = self.derive_client_finished()
else:
prf_verify_data = self.derive_server_finished()
else:
if self.client:
label = TLSPRF.TLS_MD_CLIENT_FINISH_CONST
else:
label = TLSPRF.TLS_MD_SERVER_FINISH_CONST
if data is None:
verify_data = []
for handshake in self._walk_handshake_msgs():
if handshake.haslayer(tls.TLSFinished):
# Special case of encrypted handshake. Remove crypto material to compute verify_data
verify_data.append("%s%s%s" % (chr(handshake.type), struct.pack(">I", handshake.length)[1:],
handshake[tls.TLSFinished].data))
else:
verify_data.append(str(handshake))
else:
verify_data = [data]
if self.negotiated.version == tls.TLSVersion.TLS_1_2:
prf_verify_data = self.prf.get_bytes(self.master_secret, label,
self.prf.digest.new("".join(verify_data)).digest(),
num_bytes=12)
else:
prf_verify_data = self.prf.get_bytes(self.master_secret, label,
"%s%s" % (MD5.new("".join(verify_data)).digest(),
SHA.new("".join(verify_data)).digest()),
num_bytes=12)
return prf_verify_data
示例24
def get_handshake_hash(self, digest, up_to=None, include=True):
digest = digest.new()
for handshake in self._walk_handshake_msgs():
if handshake.haslayer(up_to):
if include:
digest.update(str(handshake))
break
digest.update(str(handshake))
return digest.digest()
示例25
def get_client_signed_handshake_hash(self, hash_=SHA256.new(), pre_sign_hook=lambda x: x, sig=Sig_PKCS1_v1_5):
"""Legacy way to get the certificate verify hash. Added sig as last parameter to preserve prior use"""
if self.client_ctx.asym_keystore.private is None:
raise RuntimeError("Missing client private key. Can't sign")
msg_hash = self.get_handshake_digest(hash_)
msg_hash = pre_sign_hook(msg_hash)
# Will throw exception if we can't sign or if data is larger the modulus
return sig.new(self.client_ctx.asym_keystore.private).sign(msg_hash)
示例26
def compute_client_cert_verify(self, sig=Sig_PKCS1_v1_5, digest=SHA256, pre_sign_hook=lambda x: x):
if self.negotiated.version >= tls.TLSVersion.TLS_1_3:
sig_label = b"TLS 1.3, client CertificateVerify"
if self.prf is None:
raise RuntimeError("PRF must be initialized prior to computing TLS 1.3 signature")
# TODO: calculate handshake hash properly until the second tls.TLSCertificateList for client based certs
hash_ = self.get_handshake_hash(self.prf.digest, tls.TLSCertificateList)
return self._compute_cert_verify(self.server_ctx, hash_, sig_label, sig, digest, pre_sign_hook)
else:
return self.get_client_signed_handshake_hash(digest.new(), pre_sign_hook, sig)
示例27
def extract(self, ikm, salt=None):
if salt is None:
salt = b"\x00" * self.digest_size
self.prk = HMAC.new(salt, msg=ikm, digestmod=self.digest).digest()
return self
示例28
def expand(self, len_, info=b"", prk=None):
# L is len_, T is bytes_ in RFC 5869
if prk is not None:
self.prk = prk
if self.prk == b"":
raise HKDFError("PRK must be derived prior to calling expand")
if len_ > 255 * self.digest_size:
raise HKDFError("HKDF can output at max %d bytes, but you asked for %d" % (255 * self.digest_size, len_))
n = int(math.ceil(len_ / self.digest_size))
block = b""
bytes_ = b""
for i in range(1, n + 1):
block = HMAC.new(self.prk, "%s%s%s" % (block, info, struct.pack("B", i)), digestmod=self.digest).digest()
bytes_ += block
return bytes_[:len_]
示例29
def __init_ciphers(self):
self.enc_cipher = self.sec_params.cipher_type.new(self.ctx.sym_keystore.key)
self.dec_cipher = self.sec_params.cipher_type.new(self.ctx.sym_keystore.key)
示例30
def __init_ciphers(self):
self.enc_cipher = self.sec_params.cipher_type.new(self.ctx.sym_keystore.key, mode=self.sec_params.cipher_mode,
IV=self.ctx.sym_keystore.iv)
self.dec_cipher = self.sec_params.cipher_type.new(self.ctx.sym_keystore.key, mode=self.sec_params.cipher_mode,
IV=self.ctx.sym_keystore.iv)