Python源码示例:hypothesis.strategies.binary()
示例1
def requestResponsePair ():
def f (creq, cresp, hasPostData, reqBody, respBody):
i = RequestResponsePair ()
i.fromRequestWillBeSent (creq)
i.request.hasPostData = hasPostData
if hasPostData:
i.request.body = reqBody
if cresp is not None:
i.fromResponseReceived (cresp)
if respBody is not None:
i.response.body = respBody
return i
bodySt = st.one_of (
st.none (),
st.builds (UnicodeBody, st.text ()),
st.builds (Base64Body.fromBytes, st.binary ())
)
return st.builds (lambda reqresp, hasPostData, reqBody, respBody:
f (reqresp[0], reqresp[1], hasPostData, reqBody, respBody),
chromeReqResp (), st.booleans (), bodySt, bodySt)
示例2
def test_signing_root(data, sedes_and_values):
unsigned_sedes, unsigned_values = sedes_and_values
unsigned_value = data.draw(unsigned_values)
unsigned_hashable_value = to_hashable_value(unsigned_value, unsigned_sedes)
class SignedValueClass(SignedHashableContainer):
fields = unsigned_hashable_value._meta.fields + (("signature", bytes96),)
signature = data.draw(st.binary(min_size=96, max_size=96))
kwargs = {
field_name: unsigned_hashable_value[field_name]
for field_name in SignedValueClass._meta.field_names
if field_name != "signature"
}
hashable_value = SignedValueClass.create(**kwargs, signature=signature)
assert hashable_value.signing_root == unsigned_hashable_value.hash_tree_root
示例3
def simple_attrs_with_metadata(draw):
"""
Create a simple attribute with arbitrary metadata.
"""
c_attr = draw(simple_attrs)
keys = st.booleans() | st.binary() | st.integers() | st.text()
vals = st.booleans() | st.binary() | st.integers() | st.text()
metadata = draw(
st.dictionaries(keys=keys, values=vals, min_size=1, max_size=3)
)
return attr.ib(
default=c_attr._default,
validator=c_attr._validator,
repr=c_attr.repr,
eq=c_attr.eq,
order=c_attr.order,
hash=c_attr.hash,
init=c_attr.init,
metadata=metadata,
type=None,
converter=c_attr.converter,
)
示例4
def test_content_disposition_directly(s):
"""Test rfc6266.parse_headers directly with binary data."""
try:
cd = rfc6266.parse_headers(s)
cd.filename()
except (SyntaxError, UnicodeDecodeError, rfc6266.Error):
pass
示例5
def _bytes_strategy(
abi_type: BasicType, min_size: Optional[int] = None, max_size: Optional[int] = None
) -> SearchStrategy:
size = abi_type.sub
if not size:
return st.binary(min_size=min_size or 1, max_size=max_size or 64)
if size < 1 or size > 32:
raise ValueError(f"Invalid type: {abi_type.to_type_str()}")
if min_size is not None or max_size is not None:
raise TypeError("Cannot specify size for fixed length bytes strategy")
return st.binary(min_size=size, max_size=size)
示例6
def init_default_strategies() -> None:
"""Register all default "format" strategies."""
register_string_format("binary", st.binary())
register_string_format("byte", st.binary().map(lambda x: b64encode(x).decode()))
def make_basic_auth_str(item: Tuple[str, str]) -> str:
return _basic_auth_str(*item)
register_string_format("_basic_auth", st.tuples(st.text(), st.text()).map(make_basic_auth_str)) # type: ignore
register_string_format("_bearer_auth", st.text().map("Bearer {}".format))
示例7
def line_delimited_data(draw, max_line_size, min_lines=1):
n = draw(max_line_size)
data = st.binary(min_size=1, max_size=n).filter(lambda d: b"\n" not in d)
lines = draw(
st.lists(data, min_size=min_lines).filter(
lambda l: sum(map(len, l)) + len(l) <= n
)
)
return b"\n".join(lines) + b"\n"
示例8
def test_bytes_serialization_decode_many(binary, message_normalizer):
result = message_normalizer(binary, should_repr_strings=False)
assert result == binary.decode("utf-8", "replace")
示例9
def test_bytes_serialization_repr_many(binary, message_normalizer):
result = message_normalizer(binary, should_repr_strings=True)
assert result == repr(binary)
示例10
def test_bytes_serialization_decode(message_normalizer):
binary = b"abc123\x80\xf0\x9f\x8d\x95"
result = message_normalizer(binary, should_repr_strings=False)
assert result == u"abc123\ufffd\U0001f355"
示例11
def test_bytes_serialization_repr(message_normalizer):
binary = b"abc123\x80\xf0\x9f\x8d\x95"
result = message_normalizer(binary, should_repr_strings=True)
assert result == r"b'abc123\x80\xf0\x9f\x8d\x95'"
示例12
def test_bytes_idval(self) -> None:
"""unittest for the expected behavior to obtain ids for parametrized
bytes values:
- python2: non-ascii strings are considered bytes and formatted using
"binary escape", where any byte < 127 is escaped into its hex form.
- python3: bytes objects are always escaped using "binary escape".
"""
values = [
(b"", ""),
(b"\xc3\xb4\xff\xe4", "\\xc3\\xb4\\xff\\xe4"),
(b"ascii", "ascii"),
("αρά".encode(), "\\xce\\xb1\\xcf\\x81\\xce\\xac"),
]
for val, expected in values:
assert _idval(val, "a", 6, idfn=None, nodeid=None, config=None) == expected
示例13
def event ():
return st.one_of (
st.builds (ControllerStart, jsonObject ()),
st.builds (Script.fromStr, st.text (), st.one_of(st.none (), st.text ())),
st.builds (ScreenshotEvent, urls (), st.integers (), st.binary ()),
st.builds (DomSnapshotEvent, urls (), st.builds (lambda x: x.encode ('utf-8'), st.text ()), viewport()),
requestResponsePair (),
)
示例14
def pem_objects(draw):
"""
Strategy for generating ``pem`` objects.
"""
key = RSAPrivateKey((
b'-----BEGIN RSA PRIVATE KEY-----\n' +
encodebytes(draw(s.binary(min_size=1))) +
b'-----END RSA PRIVATE KEY-----\n'))
return [key] + [
Certificate((
b'-----BEGIN CERTIFICATE-----\n' +
encodebytes(cert) +
b'-----END CERTIFICATE-----\n'))
for cert in draw(s.lists(s.binary(min_size=1), min_size=1))]
示例15
def chunk_st():
return st.binary(min_size=32, max_size=32)
示例16
def element_st(draw, size=None):
if size is None:
size = draw(element_size_st())
return draw(st.binary(min_size=size, max_size=size))
示例17
def bytevector_value_st(size=None):
min_size = size or 1
max_size = size
return st.binary(min_size=min_size, max_size=max_size)
#
# Composite value strategies
#
示例18
def primitives():
return (
st.integers() |
st.floats(allow_nan=False) |
st.text() |
st.binary() |
st.datetimes(timezones=timezones() | st.none()) |
st.dates() |
st.times(timezones=timezones() | st.none()) |
st.timedeltas() |
st.booleans() |
st.none()
)
示例19
def file_objects():
"""Hypothesis strategy for generating pre-populated `file objects`."""
return hy_st.builds(io.BytesIO, hy_st.binary())
示例20
def strategy(self):
if self._enum is not None:
return hy_st.sampled_from(self._enum)
strategy = hy_st.binary(min_size=self._min_length,
max_size=self._max_length)
return strategy
示例21
def test_can_encode_binary_body_with_header_charset(sample_app):
response = app.Response(
status_code=200,
body=b'foobar',
headers={'Content-Type': 'application/octet-stream; charset=binary'}
)
encoded_response = response.to_dict(sample_app.api.binary_types)
assert encoded_response['body'] == 'Zm9vYmFy'
示例22
def test_handles_binary_responses(body, content_type):
r = Response(body=body, headers={'Content-Type': content_type})
serialized = r.to_dict(BINARY_TYPES)
# A binary response should always result in the
# response being base64 encoded.
assert serialized['isBase64Encoded']
assert isinstance(serialized['body'], six.string_types)
assert isinstance(base64.b64decode(serialized['body']), bytes)
示例23
def test_fs_offline_restart_and_rwfile(user_fs_offline_state_machine, alice):
class FSOfflineRestartAndRWFile(user_fs_offline_state_machine):
@initialize()
async def init(self):
await self.reset_all()
self.device = alice
await self.restart_user_fs(self.device)
self.wid = await self.user_fs.workspace_create("w")
self.workspace = self.user_fs.get_workspace(self.wid)
await self.workspace.touch("/foo.txt")
self.file_oracle = FileOracle()
@rule()
async def restart(self):
await self.restart_user_fs(self.device)
self.workspace = self.user_fs.get_workspace(self.wid)
@rule(
size=st.integers(min_value=0, max_value=PLAYGROUND_SIZE),
offset=st.integers(min_value=0, max_value=PLAYGROUND_SIZE),
)
async def atomic_read(self, size, offset):
content = await self.workspace.read_bytes("/foo.txt", size=size, offset=offset)
expected_content = self.file_oracle.read(size, offset)
assert content == expected_content
@rule(
offset=st.integers(min_value=0, max_value=PLAYGROUND_SIZE),
content=st.binary(max_size=PLAYGROUND_SIZE),
)
async def atomic_write(self, offset, content):
await self.workspace.write_bytes(
"/foo.txt", data=content, offset=offset, truncate=False
)
self.file_oracle.write(offset, content)
@rule(length=st.integers(min_value=0, max_value=PLAYGROUND_SIZE))
async def atomic_truncate(self, length):
await self.workspace.truncate("/foo.txt", length=length)
self.file_oracle.truncate(length)
FSOfflineRestartAndRWFile.run_as_test()
示例24
def test_file_operations(hypothesis_settings, tmpdir):
class FileOperations(RuleBasedStateMachine):
def __init__(self) -> None:
super().__init__()
self.oracle = open(tmpdir / "oracle.txt", "w+b")
self.manifest = LocalFileManifest.new_placeholder(parent=EntryID(), blocksize=8)
self.storage = Storage()
def teardown(self) -> None:
self.oracle.close()
self.storage.clear()
@invariant()
def integrity(self) -> None:
self.manifest.assert_integrity()
@invariant()
def leaks(self) -> None:
all_ids = {chunk.id for chunks in self.manifest.blocks for chunk in chunks}
assert set(self.storage) == all_ids
@rule(size=size, offset=size)
def read(self, size: int, offset: int) -> None:
data = self.storage.read(self.manifest, size, offset)
self.oracle.seek(offset)
expected = self.oracle.read(size)
assert data == expected
@rule(content=strategies.binary(max_size=MAX_SIZE), offset=size)
def write(self, content: bytes, offset: int) -> None:
self.manifest = self.storage.write(self.manifest, content, offset)
self.oracle.seek(offset)
self.oracle.write(content)
@rule(length=size)
def resize(self, length: int) -> None:
self.manifest = self.storage.resize(self.manifest, length)
self.oracle.truncate(length)
@rule()
def reshape(self) -> None:
self.manifest = self.storage.reshape(self.manifest)
assert self.manifest.is_reshaped()
run_state_machine_as_test(FileOperations, settings=hypothesis_settings)
示例25
def atoms(which="basic", finite=False):
"""
Return atomic Python types.
Args:
which:
The category of atomic types to choose.
'basic':
Basic Python atomic literals (numbers, strings, bools, None, Ellipsis).
'ordered':
Exclude complex numbers, None, and Ellipsis since they do not have an
ordering relation.
'json':
Only valid JSON data types (including valid Javascript numeric
ranges)
finite:
Only yield finite numeric values (i.e., no NaN and infinities)
"""
strategies = [
st.booleans(),
st.text(),
st.floats(
allow_nan=not finite and which != "json",
allow_infinity=not finite and which != "json",
),
]
add = strategies.append
if which in ("basic", "ordered"):
add(st.integers())
add(st.just(None))
add(st.binary())
if which == "json":
# See also:
# https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MIN_SAFE_INTEGER
add(st.integers(min_value=-(2 ** 53 - 1), max_value=2 ** 53 - 1))
add(st.just(None))
if which == "basic":
add(st.just(...))
add(st.complex_numbers(allow_nan=not finite, allow_infinity=not finite,))
return st.one_of(*strategies)