Python源码示例:hypothesis.strategies.data()
示例1
def test_run_hub(self, data):
Hub.hubs = []
sensor_name = 'sensor'
sensor = data.draw(st.sampled_from(self.sensor_list))
capabilities = self._draw_capabilities(data, sensor)
hub_type = data.draw(st.sampled_from(self.hub_list))
TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
hub = TestHub('test_hub')
# Start the hub
#kernel.run(self._emit_control(TestHub))
with patch('Adafruit_BluefruitLE.get_provider') as ble,\
patch('bricknil.ble_queue.USE_BLEAK', False) as use_bleak:
ble.return_value = MockBLE(hub)
sensor_obj = getattr(hub, sensor_name)
sensor_obj.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this"))
kernel.run(self._emit_control, data, hub, stop_evt, ble(), sensor_obj)
#start(system)
示例2
def test_mixture(self, data, versions, digits):
min_version, max_version = versions
min_digits, max_digits = digits
# Check that the our minimum version doesn't have too many digits.
# TODO: Can we remove these assumptions?
assume(len(min_version.split(".")) <= max_digits)
assume(len(max_version.split(".")) <= max_digits)
version = data.draw(
st_version(
min_digits=min_digits,
max_digits=max_digits,
min_version=min_version,
max_version=max_version,
)
)
assert (
self._ver_2_list(min_version)
<= self._ver_2_list(version)
<= self._ver_2_list(max_version)
)
assert min_digits <= len(version.split(".")) <= max_digits
示例3
def test_read_truncated_header_raises_error(self, data, count, encoding, random):
written_headers = data.draw(extended_textual_header(count=count))
with BytesIO() as fh:
for header in written_headers:
for line in header:
fh.write(line.encode(encoding))
fh.seek(0, os.SEEK_END)
length = fh.tell()
truncate_pos = random.randrange(0, length - 1)
truncated_buffer = fh.getbuffer()[:truncate_pos]
with BytesIO(truncated_buffer):
with raises(EOFError):
toolkit.read_extended_headers_counted(fh, count, encoding=encoding)
del truncated_buffer
示例4
def test_no_arbitrary_decoding(self):
"""
``wire_decode`` will not decode classes that are not in
``SERIALIZABLE_CLASSES``.
"""
class Temp(PClass):
"""A class."""
SERIALIZABLE_CLASSES.append(Temp)
def cleanup():
if Temp in SERIALIZABLE_CLASSES:
SERIALIZABLE_CLASSES.remove(Temp)
self.addCleanup(cleanup)
data = wire_encode(Temp())
SERIALIZABLE_CLASSES.remove(Temp)
# Possibly future versions might throw exception, the key point is
# that the returned object is not a Temp instance.
self.assertFalse(isinstance(wire_decode(data), Temp))
示例5
def test_can_create_latest_golden(self):
"""
The latest golden files should be identical to ones generated from
HEAD.
"""
configs_dir = FilePath(__file__).sibling('configurations')
for i, deployment in enumerate(TEST_DEPLOYMENTS, start=1):
encoding = wire_encode(
Configuration(version=_CONFIG_VERSION, deployment=deployment)
)
path = configs_dir.child(
b"configuration_%d_v%d.json" % (i, _CONFIG_VERSION)
)
self.assertEqual(
json.loads(encoding),
json.loads(path.getContent().rstrip()),
"Golden test file %s can not be generated from HEAD. Please "
"review the python files in that directory to re-generate "
"that file if you have intentionally changed the backing test "
"data. You might need to update the model version and write "
"an upgrade test if you are intentionally changing the "
"model." % (path.path,)
)
示例6
def test_padding(ndim: int, data: st.DataObject):
"""Ensure that convolving a padding-only image with a commensurate kernel yields the single entry: 0"""
padding = data.draw(
st.integers(1, 3) | st.tuples(*[st.integers(1, 3)] * ndim), label="padding"
)
x = Tensor(
data.draw(
hnp.arrays(shape=(1, 1) + (0,) * ndim, dtype=float, elements=st.floats()),
label="x",
)
)
pad_tuple = padding if isinstance(padding, tuple) else (padding,) * ndim
kernel = data.draw(
hnp.arrays(
shape=(1, 1) + tuple(2 * p for p in pad_tuple),
dtype=float,
elements=st.floats(allow_nan=False, allow_infinity=False),
)
)
out = conv_nd(x, kernel, padding=padding, stride=1)
assert out.shape == (1,) * x.ndim
assert out.item() == 0.0
out.sum().backward()
assert x.grad.shape == x.shape
示例7
def simple_loss(x1, x2, y, margin):
"""
x1 : mygrad.Tensor, shape=(N, D)
x2 : mygrad.Tensor, shape=(N, D)
y : Union[int, numpy.ndarray, mygrad.Tensor], scalar or shape=(N,)
margin : float
Returns
-------
mygrad.Tensor, shape=()
"""
if isinstance(y, mg.Tensor):
y = y.data
y = np.asarray(y)
if y.ndim:
assert y.size == 1 or len(y) == len(x1)
if x1.ndim == 2:
y = y.reshape(-1, 1)
return mg.mean(mg.maximum(0, margin - y * (x1 - x2)))
示例8
def test_negative_log_likelihood(data: st.DataObject, labels_as_tensor: bool):
s = data.draw(
hnp.arrays(
shape=hnp.array_shapes(max_side=10, min_dims=2, max_dims=2),
dtype=float,
elements=st.floats(-100, 100),
)
)
y_true = data.draw(
hnp.arrays(
shape=(s.shape[0],),
dtype=hnp.integer_dtypes(),
elements=st.integers(min_value=0, max_value=s.shape[1] - 1),
).map(Tensor if labels_as_tensor else lambda x: x)
)
scores = Tensor(s)
nll = negative_log_likelihood(mg.log(mg.nnet.softmax(scores)), y_true)
nll.backward()
cross_entropy_scores = Tensor(s)
ce = softmax_crossentropy(cross_entropy_scores, y_true)
ce.backward()
assert_allclose(nll.data, ce.data, atol=1e-5, rtol=1e-5)
assert_allclose(scores.grad, cross_entropy_scores.grad, atol=1e-5, rtol=1e-5)
示例9
def _draw_capabilities(self, data, sensor):
if len(sensor.allowed_combo) > 0:
# test capabilities 1 by 1,
# or some combination of those in the allowed_combo list
capabilities = data.draw(
st.one_of(
st.lists(st.sampled_from([cap.name for cap in list(sensor.capability)]), min_size=1, max_size=1),
st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1),
st.lists(st.sampled_from(sensor.allowed_combo), min_size=1, unique=True)
)
)
else:
# if no combos allowed, then just test 1 by 1
capabilities = data.draw(st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1))
return capabilities
示例10
def test_attach_sensor(self, data):
sensor_name = 'sensor'
sensor = data.draw(st.sampled_from(self.sensor_list))
capabilities = self._draw_capabilities(data, sensor)
hub_type = data.draw(st.sampled_from(self.hub_list))
TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
hub = TestHub('testhub')
# Check to make sure we have the peripheral attached
# and the sensor inserted as an attribute
assert sensor_name in hub.peripherals
assert hasattr(hub, sensor_name)
示例11
def _emit_control(self, data, hub, hub_stop_evt, ble, sensor):
async def dummy():
pass
system = await spawn(bricknil.bricknil._run_all(ble, dummy))
while not hub.peripheral_queue:
await sleep(0.1)
#await sleep(3)
port = data.draw(st.integers(0,254))
await hub.peripheral_queue.put( ('attach', (port, sensor.sensor_name)) )
# Now, make sure the sensor sent an activate updates message
if sensor.sensor_name == "Button":
await self._wait_send_message(sensor.send_message, 'Activate button')
else:
await self._wait_send_message(sensor.send_message, 'Activate SENSOR')
# Need to generate a value on the port
# if False:
msg = []
if len(sensor.capabilities) == 1:
# Handle single capability
for cap in sensor.capabilities:
n_datasets, byte_count = sensor.datasets[cap][0:2]
for i in range(n_datasets):
for b in range(byte_count):
msg.append(data.draw(st.integers(0,255)))
msg = bytearray(msg)
await hub.peripheral_queue.put( ('value_change', (port, msg)))
elif len(sensor.capabilities) > 1:
modes = 1
msg.append(modes)
for cap_i, cap in enumerate(sensor.capabilities):
if modes & (1<<cap_i):
n_datasets, byte_count = sensor.datasets[cap][0:2]
for i in range(n_datasets):
for b in range(byte_count):
msg.append(data.draw(st.integers(0,255)))
msg = bytearray(msg)
await hub.peripheral_queue.put( ('value_change', (port, msg)))
await hub_stop_evt.set()
await system.join()
示例12
def test_attach_message(self, data, port, event):
msg_type = 0x04
msg = bytearray([msg_type, port, event])
if event == 0: #detach
l = self.m.parse(self._with_header(msg))
assert l == f'Detached IO Port:{port}'
elif event == 1: #attach
# Need 10 bytes
#dev_id = data.draw(st.integers(0,255))
dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
fw_version = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8))
msg = msg + bytearray([dev_id, 0])+ bytearray(fw_version)
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
# ALso need to make sure the port info is added to dispatch
assert self.m.port_info[port]['name'] == DEVICES[dev_id]
elif event == 2: # virtual attach
dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
v_port_a = data.draw(st.integers(0,255))
v_port_b = data.draw(st.integers(0,255))
msg = msg + bytearray([dev_id, 0, v_port_a, v_port_b])
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
assert l == f'Attached VirtualIO Port:{port} {self.m.port_info[port]["name"]} Port A: {v_port_a}, Port B: {v_port_b}'
assert self.m.port_info[port]['virtual'] == (v_port_a, v_port_b)
assert self.m.port_info[port]['name'] == DEVICES[dev_id]
示例13
def test_port_information_message(self, data, port, mode):
msg_type = 0x43
if mode == 1:
capabilities = data.draw(st.integers(0,15)) # bit mask of 4 bits
nmodes = data.draw(st.integers(0,255))
input_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))]
output_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))]
msg = bytearray([msg_type, port, mode, capabilities, nmodes]+input_modes+output_modes)
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_info_received', port))
# Make sure the proper capabilities have been set
bitmask = ['output', 'input', 'combinable', 'synchronizable'] # capabilities
for i,cap in enumerate(bitmask):
assert self.m.port_info[port][cap] == capabilities & (1<<i)
for i in range(8):
if input_modes[0] & (1<<i):
assert self.m.port_info[port]['modes'][i]['input']
if input_modes[1] & (1<<i):
assert self.m.port_info[port]['modes'][i+8]['input']
if output_modes[0] & (1<<i):
assert self.m.port_info[port]['modes'][i]['output']
if output_modes[1] & (1<<i):
assert self.m.port_info[port]['modes'][i+8]['output']
elif mode == 2:
# Combination info
# Up to 8x 16-bit words (bitmasks) of combinations possible
ncombos = data.draw(st.integers(0,6)) # how many combos should we allow
combos = data.draw(st.lists(st.integers(0,255), min_size=ncombos*2, max_size=ncombos*2))
msg = bytearray([msg_type, port, mode]+combos+[0,0])
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_combination_info_received', port))
# Assert number of combos
#assert len(combos)/2 == len(self.m.port_info[port]['mode_combinations'])
示例14
def test_port_mode_info_message(self, port, mode, mode_type, data):
msg_type = 0x44
if mode_type == 0:
name = data.draw(st.text(min_size=1, max_size=11))
payload = bytearray(name.encode('utf-8'))
elif mode_type == 1 or mode_type == 2 or mode_type==3:
payload = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8))
payload = bytearray(payload)
elif mode_type == 4:
name = data.draw(st.text(min_size=1, max_size=5))
payload = bytearray(name.encode('utf-8'))
elif mode_type == 5:
payload = data.draw(st.lists(st.integers(0,255), min_size=2, max_size=2))
payload = bytearray(payload)
elif mode_type == 0x80:
ndatasets = data.draw(st.integers(0,255))
dataset_type = data.draw(st.integers(0,3))
total_figures = data.draw(st.integers(0,255))
decimals = data.draw(st.integers(0,255))
payload = bytearray([ndatasets, dataset_type, total_figures, decimals])
pass
else:
assert False
msg = bytearray([msg_type, port, mode, mode_type]) + payload
self.m.parse(self._with_header(msg))
示例15
def test_greater_than_minimum(self, data, min_version):
version = data.draw(st_version(min_version=min_version))
assert self._ver_2_list(version) >= self._ver_2_list(min_version)
示例16
def test_less_than_maximum(self, data, max_version):
version = data.draw(st_version(max_version=max_version))
assert self._ver_2_list(version) <= self._ver_2_list(max_version)
示例17
def test_inbetween_min_and_max(self, data, versions):
min_version, max_version = versions
version = data.draw(
st_version(min_version=min_version, max_version=max_version)
)
assert (
self._ver_2_list(min_version)
<= self._ver_2_list(version)
<= self._ver_2_list(max_version)
)
示例18
def test_produces_with_more_digits_than_min(self, data, min_digits):
version = data.draw(st_version(min_digits=min_digits))
assert len(version.split(".")) >= min_digits
示例19
def test_produces_with_less_digits_than_max(self, data, max_digits):
version = data.draw(st_version(max_digits=max_digits))
assert len(version.split(".")) <= max_digits
示例20
def test_read_header_successfully(self, data, count, encoding):
written_headers = data.draw(extended_textual_header(count=count))
with BytesIO() as fh:
for header in written_headers:
for line in header:
fh.write(line.encode(encoding))
fh.seek(0)
read_headers = toolkit.read_extended_headers_counted(fh, count, encoding=encoding)
assert read_headers == written_headers
示例21
def test_premature_end_text_stanza_raises_value_error(self, data, count_a, count_b, encoding):
headers_a = data.draw(extended_textual_header(count=count_a, end_text_stanza_probability=1.0))
headers_b = data.draw(extended_textual_header(count=count_b))
with BytesIO() as fh:
for header in headers_a:
for line in header:
fh.write(line.encode(encoding))
for header in headers_b:
for line in header:
fh.write(line.encode(encoding))
fh.seek(0)
with raises(ValueError):
toolkit.read_extended_headers_counted(fh, count_a + count_b, encoding=encoding)
示例22
def test_uncounted_headers_are_correctly_detected(self, data, encoding):
binary_header = BinaryReelHeader(num_extended_textual_headers=-1)
written_headers = data.draw(extended_textual_header())
with BytesIO() as fh:
fh.write(b' ' * toolkit.REEL_HEADER_NUM_BYTES)
for header in written_headers:
for line in header:
fh.write(line.encode(encoding))
fh.seek(0)
read_headers = toolkit.read_extended_textual_headers(
fh, binary_header, encoding=encoding)
assert read_headers == written_headers
示例23
def test_counted_headers_are_correctly_detected(self, data, count, encoding):
binary_header = BinaryReelHeader(num_extended_textual_headers=count)
written_headers = data.draw(extended_textual_header(count=count))
with BytesIO() as fh:
fh.write(b' ' * toolkit.REEL_HEADER_NUM_BYTES)
for header in written_headers:
for line in header:
fh.write(line.encode(encoding))
fh.seek(0)
read_headers = toolkit.read_extended_textual_headers(
fh, binary_header, encoding=encoding)
assert read_headers == written_headers
示例24
def test_truncated_read_binary_values_raises_eof_error(self, seg_y_type, num_items, endian, data):
assume(seg_y_type != SegYType.IBM)
ctype = SEG_Y_TYPE_TO_CTYPE[seg_y_type]
c_format = ''.join(map(str, (endian, num_items, ctype)))
py_type = PY_TYPES[seg_y_type]
min_value, max_value = LIMITS[seg_y_type]
items_written = data.draw(st.lists(
elements=NUMBER_STRATEGY[py_type](min_value=min_value, max_value=max_value),
min_size=num_items, max_size=num_items))
buffer = struct.pack(c_format, *items_written)
truncate_pos = data.draw(st.integers(min_value=0, max_value=max(0, len(buffer) - 1)))
truncated_buffer = buffer[:truncate_pos]
with BytesIO(truncated_buffer) as fh:
with raises(EOFError):
toolkit.read_binary_values(fh, 0, seg_y_type, num_items, endian)
示例25
def test_pack_values(self, seg_y_type, num_items, endian, data):
assume(seg_y_type not in {SegYType.FLOAT32, SegYType.IBM})
c_type = SEG_Y_TYPE_TO_CTYPE[seg_y_type]
c_format = ''.join(map(str, (endian, num_items, c_type)))
py_type = PY_TYPES[seg_y_type]
min_value, max_value = LIMITS[seg_y_type]
items_written = tuple(data.draw(st.lists(
elements=NUMBER_STRATEGY[py_type](min_value=min_value, max_value=max_value),
min_size=num_items, max_size=num_items)))
buffer = toolkit.pack_values(items_written, c_type, endian)
items_read = struct.unpack(c_format, buffer)
assert items_written == items_read
示例26
def test_schema_inference(data, schema):
# Test that we can always generate a valid instance
instance = data.draw(from_schema(schema))
assert validate(schema, instance)
# TODO: write a test that shows validate may return False (maybe a unit test!)
##############################################################################
# Metamorphic testing with pathfinding problems
示例27
def test_bfs_finds_shortest_path(graph, data):
# Using label=... is easier to follow than the default "Draw #1" style!
start = data.draw(st.sampled_from(sorted(graph)), label="start")
end = data.draw(st.sampled_from(sorted(graph)), label="end")
path = breadth_first_search(graph, start, end)
midpoint = data.draw(st.sampled_from(path), label="midpoint")
# TODO: your code here!
示例28
def mean(data, as_type=Fraction):
"""Return the mean of the input list, as the given type."""
# This function is a correct implementation of the arithmetic mean,
# so that you can test it according to the metamorphic properties of
# that mathematical equation for integers, floats, and fractions.
assert as_type in (int, float, Fraction), as_type
if as_type == int:
return sum(int(n) for n in data) // len(data) # integer division case
return sum(as_type(n) for n in data) / len(data) # float or Fraction case
# You can use parametrize and given together, but two tips for best results:
# 1. Put @parametrize *outside* @given - it doesn't work properly from the inside
# 2. Use named arguments to @given - avoids confusing or colliding positional arguments
示例29
def test_mean_properties(data, type_, strat):
strat = strat or st.from_type(type_)
values = data.draw(st.lists(strat, min_size=1))
result = mean(values) # already testing no exceptions!
# TODO: property assertions, e.g. bounds on result, etc.
if type_ is Fraction:
assert min(values) <= result <= max(values)
# What constraints make sense for an integer mean?
# TODO: metamorphic test assertions. For example, how should result
# change if you add the mean to values? a number above or below result?
# Remove some elements from values?
示例30
def test_map_odd_numbers(x):
assert x[-1] in "13579"
# Takeaway
# --------
# `.map` permits us to extend Hypothesis' core strategies in powerful
# ways. See that it can be used to affect the individual values being
# produced by a strategy (e.g. mapping integers to even-valued
# integers), as well as to cast the values to a different type (e.g.
# mapping an integer to a string.
#
# If it seems like a data-type is missing from Hypothesis'
# strategies, then it is likely that a simple application of `.map`
# will suffice. E.g. suppose you want a strategy that generate deques,
# then
# `deque_strat = st.lists(...).map(deque)`
# will serve nicely - we don't even need a lambda!
##############################################################################
# Defining recursive data.
# There are a couple of ways to define recursive data with Hypothesis,
# leaning on the fact that strategies are lazily instantiated.
#
# `st.recursive` takes a base strategy, and a function that takes a strategy
# and returns an extended strategy. All good if we want that structure!
# If you want mutual recursion though, or have a complicated kind of data
# (or just limited time in a tutorial), `st.deferred` is the way to go.
#
# The `Record` exercise in pbt-101.py defined JSON using `st.recursive`,
# if you want to compare them, and has some extension problems that you
# could write as tests here instead.
# JSON values are defined as one of null, false, true, a finite number,
# a string, an array of json values, or a dict of string to json values.