Python源码示例:hypothesis.strategies.lists()
示例1
def pattern_to_statements(pattern):
if isinstance(pattern, template):
return lists(just(pattern), min_size=1, max_size=1)
rule, value = pattern
if rule == 'sequence':
return tuples(*map(pattern_to_statements, value)).map(unpack_list).map(list)
elif rule == 'alternates':
return one_of(*map(pattern_to_statements, value))
elif rule == 'zeroOrMore':
return lists(pattern_to_statements(value)).map(unpack_list).map(list)
elif rule == 'oneOrMore':
return lists(pattern_to_statements(value), min_size=1).map(unpack_list).map(list)
elif rule == 'optional':
return lists(pattern_to_statements(value), min_size=0, max_size=1).map(unpack_list).map(list)
else:
raise Exception("impossible!", rule)
# this replicates the current scorm pattern, a realistic example of medium
# complexity. Note it has repeated elements, just not in ambiguous ways.
示例2
def _array_strategy(
abi_type: BasicType,
min_length: ArrayLengthType = 1,
max_length: ArrayLengthType = 8,
unique: bool = False,
**kwargs: Any,
) -> SearchStrategy:
if abi_type.arrlist[-1]:
min_len = max_len = abi_type.arrlist[-1][0]
else:
dynamic_len = len([i for i in abi_type.arrlist if not i])
min_len = _get_array_length("min_length", min_length, dynamic_len)
max_len = _get_array_length("max_length", max_length, dynamic_len)
if abi_type.item_type.is_array:
kwargs.update(min_length=min_length, max_length=max_length, unique=unique)
base_strategy = strategy(abi_type.item_type.to_type_str(), **kwargs)
strat = st.lists(base_strategy, min_size=min_len, max_size=max_len, unique=unique)
# swap 'size' for 'length' in the repr
repr_ = "length".join(strat.__repr__().rsplit("size", maxsplit=2))
strat._LazyStrategy__representation = repr_ # type: ignore
return strat
示例3
def arguments_node(draw, annotated=False):
n = draw(hs.integers(min_value=1, max_value=5))
args = draw(hs.lists(name_node(None), min_size=n, max_size=n))
if annotated:
annotations = draw(hs.lists(name_node(annotation), min_size=n, max_size=n))
else:
annotations = None
node = astroid.Arguments()
node.postinit(
args,
None,
None,
None,
annotations
)
return node
示例4
def from_schema(schema):
"""Returns a strategy for objects that match the given schema."""
check_schema(schema)
# TODO: actually handle constraints on number/string/array schemas
return dict(
null=st.none(),
bool=st.booleans(),
number=st.floats(allow_nan=False),
string=st.text(),
array=st.lists(st.nothing()),
)[schema["type"]]
# `@st.composite` is one way to write this - another would be to define a
# bare function, and `return st.one_of(st.none(), st.booleans(), ...)` so
# each strategy can be defined individually. Use whichever seems more
# natural to you - the important thing in tests is usually readability!
示例5
def node_uuid_pool_strategy(draw, min_number_of_nodes=1):
"""
A strategy to create a pool of node uuids.
:param min_number_of_nodes: The minimum number of nodes to create.
:returns: A strategy to create an iterable of node uuids.
"""
max_number_of_nodes = max(min_number_of_nodes, 10)
return draw(
st.lists(
uuids(),
min_size=min_number_of_nodes,
max_size=max_number_of_nodes
)
)
示例6
def same_len_lists(draw, min_value = None, max_value = None):
"""Draw random arrays of equal lengths
One precondition of the list version of spherical() is that its inputs must
be of equal length.
"""
n = draw(integers(min_value = 0, max_value = 50))
fixlen = lists(
floats(
min_value = min_value,
max_value = max_value,
allow_nan = False,
allow_infinity = False
),
min_size = n,
max_size = n,
)
fixnp = fixlen.map(np.array)
return (draw(fixnp), draw(fixnp), draw(fixnp))
示例7
def fixedDicts (fixed, dynamic):
return st.builds (lambda x, y: x.update (y), st.fixed_dictionaries (fixed), st.lists (dynamic))
示例8
def _fuzz_array(
parameter: Dict[str, Any],
required: bool = False,
) -> SearchStrategy:
item = parameter['items']
required = parameter.get('required', required)
# TODO: Handle `oneOf`
strategy = st.lists(
elements=_fuzz_parameter(item, required=required),
min_size=parameter.get(
'minItems',
0 if not required else 1,
),
max_size=parameter.get('maxItems', None),
)
if not required:
return st.one_of(st.none(), strategy)
return strategy
示例9
def action_structures(draw):
"""
A Hypothesis strategy that creates a tree of L{ActionStructure} and
L{unicode}.
"""
tree = draw(st.recursive(labels, st.lists, max_leaves=20))
def to_structure(tree_or_message):
if isinstance(tree_or_message, list):
return ActionStructure(
type=draw(labels),
failed=draw(st.booleans()),
children=[to_structure(o) for o in tree_or_message],
)
else:
return tree_or_message
return to_structure(tree)
示例10
def test_concat_basic(nums: tp.List[int]):
nums_py = list(map(lambda x: x + 1, nums))
nums_py1 = list(map(lambda x: x ** 2, nums_py))
nums_py2 = list(map(lambda x: -x, nums_py))
nums_py = nums_py1 + nums_py2
nums_pl = pl.process.map(lambda x: x + 1, nums)
nums_pl1 = pl.process.map(lambda x: x ** 2, nums_pl)
nums_pl2 = pl.process.map(lambda x: -x, nums_pl)
nums_pl = pl.process.concat([nums_pl1, nums_pl2])
assert sorted(nums_pl) == sorted(nums_py)
# @hp.given(nums=st.lists(st.integers()))
# @hp.settings(max_examples=MAX_EXAMPLES)
示例11
def test_concat_basic_2(nums: tp.List[int]):
nums_py = list(map(lambda x: x + 1, nums))
nums_py1 = list(map(lambda x: x ** 2, nums_py))
nums_py2 = list(map(lambda x: -x, nums_py))
nums_py = nums_py1 + nums_py2
nums_pl = pl.task.map(lambda x: x + 1, nums)
nums_pl1 = pl.task.map(lambda x: x ** 2, nums_pl)
nums_pl2 = pl.task.map(lambda x: -x, nums_pl)
nums_pl = await pl.task.concat([nums_pl1, nums_pl2])
assert sorted(nums_pl) == sorted(nums_py)
# @hp.given(nums=st.lists(st.integers()))
# @hp.settings(max_examples=MAX_EXAMPLES)
示例12
def test_concat_basic(nums: tp.List[int]):
nums_py = list(map(lambda x: x + 1, nums))
nums_py1 = list(map(lambda x: x ** 2, nums_py))
nums_py2 = list(map(lambda x: -x, nums_py))
nums_py = nums_py1 + nums_py2
nums_pl = pl.thread.map(lambda x: x + 1, nums)
nums_pl1 = pl.thread.map(lambda x: x ** 2, nums_pl)
nums_pl2 = pl.thread.map(lambda x: -x, nums_pl)
nums_pl = pl.thread.concat([nums_pl1, nums_pl2])
assert sorted(nums_pl) == sorted(nums_py)
# @hp.given(nums=st.lists(st.integers()))
# @hp.settings(max_examples=MAX_EXAMPLES)
示例13
def func_wrap_strategy(args, func):
min_size = func.arity[0]
max_size = func.arity[1] and func.arity[0] or 4
return st.lists(args, min_size=min_size, max_size=max_size).map(lambda a: func(*a))
示例14
def func_wrap_strategy(args, func):
min_size = func.arity[0]
max_size = func.arity[1] and func.arity[0] or 4
return st.lists(args, min_size=min_size, max_size=max_size).map(lambda a: func(*a))
示例15
def _draw_capabilities(self, data, sensor):
if len(sensor.allowed_combo) > 0:
# test capabilities 1 by 1,
# or some combination of those in the allowed_combo list
capabilities = data.draw(
st.one_of(
st.lists(st.sampled_from([cap.name for cap in list(sensor.capability)]), min_size=1, max_size=1),
st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1),
st.lists(st.sampled_from(sensor.allowed_combo), min_size=1, unique=True)
)
)
else:
# if no combos allowed, then just test 1 by 1
capabilities = data.draw(st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1))
return capabilities
示例16
def test_port_value_message(self, data):
port = data.draw(st.integers(0,255))
width = data.draw(st.integers(1,3))
nbytes = 1<<(width-1)
values = data.draw(st.lists(st.integers(0,255),min_size=nbytes,max_size=nbytes ))
msg_type = 0x45
msg = bytearray([msg_type, port]+values)
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_called_with(('value_change', (port,values)))
示例17
def test_attach_message(self, data, port, event):
msg_type = 0x04
msg = bytearray([msg_type, port, event])
if event == 0: #detach
l = self.m.parse(self._with_header(msg))
assert l == f'Detached IO Port:{port}'
elif event == 1: #attach
# Need 10 bytes
#dev_id = data.draw(st.integers(0,255))
dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
fw_version = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8))
msg = msg + bytearray([dev_id, 0])+ bytearray(fw_version)
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
# ALso need to make sure the port info is added to dispatch
assert self.m.port_info[port]['name'] == DEVICES[dev_id]
elif event == 2: # virtual attach
dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
v_port_a = data.draw(st.integers(0,255))
v_port_b = data.draw(st.integers(0,255))
msg = msg + bytearray([dev_id, 0, v_port_a, v_port_b])
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
assert l == f'Attached VirtualIO Port:{port} {self.m.port_info[port]["name"]} Port A: {v_port_a}, Port B: {v_port_b}'
assert self.m.port_info[port]['virtual'] == (v_port_a, v_port_b)
assert self.m.port_info[port]['name'] == DEVICES[dev_id]
示例18
def test_port_information_message(self, data, port, mode):
msg_type = 0x43
if mode == 1:
capabilities = data.draw(st.integers(0,15)) # bit mask of 4 bits
nmodes = data.draw(st.integers(0,255))
input_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))]
output_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))]
msg = bytearray([msg_type, port, mode, capabilities, nmodes]+input_modes+output_modes)
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_info_received', port))
# Make sure the proper capabilities have been set
bitmask = ['output', 'input', 'combinable', 'synchronizable'] # capabilities
for i,cap in enumerate(bitmask):
assert self.m.port_info[port][cap] == capabilities & (1<<i)
for i in range(8):
if input_modes[0] & (1<<i):
assert self.m.port_info[port]['modes'][i]['input']
if input_modes[1] & (1<<i):
assert self.m.port_info[port]['modes'][i+8]['input']
if output_modes[0] & (1<<i):
assert self.m.port_info[port]['modes'][i]['output']
if output_modes[1] & (1<<i):
assert self.m.port_info[port]['modes'][i+8]['output']
elif mode == 2:
# Combination info
# Up to 8x 16-bit words (bitmasks) of combinations possible
ncombos = data.draw(st.integers(0,6)) # how many combos should we allow
combos = data.draw(st.lists(st.integers(0,255), min_size=ncombos*2, max_size=ncombos*2))
msg = bytearray([msg_type, port, mode]+combos+[0,0])
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_combination_info_received', port))
# Assert number of combos
#assert len(combos)/2 == len(self.m.port_info[port]['mode_combinations'])
示例19
def test_port_mode_info_message(self, port, mode, mode_type, data):
msg_type = 0x44
if mode_type == 0:
name = data.draw(st.text(min_size=1, max_size=11))
payload = bytearray(name.encode('utf-8'))
elif mode_type == 1 or mode_type == 2 or mode_type==3:
payload = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8))
payload = bytearray(payload)
elif mode_type == 4:
name = data.draw(st.text(min_size=1, max_size=5))
payload = bytearray(name.encode('utf-8'))
elif mode_type == 5:
payload = data.draw(st.lists(st.integers(0,255), min_size=2, max_size=2))
payload = bytearray(payload)
elif mode_type == 0x80:
ndatasets = data.draw(st.integers(0,255))
dataset_type = data.draw(st.integers(0,3))
total_figures = data.draw(st.integers(0,255))
decimals = data.draw(st.integers(0,255))
payload = bytearray([ndatasets, dataset_type, total_figures, decimals])
pass
else:
assert False
msg = bytearray([msg_type, port, mode, mode_type]) + payload
self.m.parse(self._with_header(msg))
示例20
def csv_strategy(enum):
return st.lists(st.sampled_from([item.name for item in enum]), min_size=1).map(",".join)
# The following strategies generate CLI parameters, for example "--workers=5" or "--exitfirst"
示例21
def list_of_dicts(draw):
return draw(st.lists(dict_str()))
示例22
def line_delimited_data(draw, max_line_size, min_lines=1):
n = draw(max_line_size)
data = st.binary(min_size=1, max_size=n).filter(lambda d: b"\n" not in d)
lines = draw(
st.lists(data, min_size=min_lines).filter(
lambda l: sum(map(len, l)) + len(l) <= n
)
)
return b"\n".join(lines) + b"\n"
示例23
def chunked(draw, source):
data = draw(source)
chunk_sizes = [0]
chunk_sizes += draw(
st.lists(st.integers(0, len(data) - 1), unique=True).map(sorted)
)
chunk_sizes += [len(data)]
return [data[u:v] for u, v in zip(chunk_sizes, chunk_sizes[1:])]
示例24
def byte_arrays_of_floats(draw):
num_items = draw(st.integers(min_value=0, max_value=10))
floats = draw(st.lists(ibm_compatible_floats(),
min_size=num_items,
max_size=num_items))
byte_data = b''.join([ieee2ibm(f) for f in floats])
return (byte_data, num_items)
示例25
def multiline_ascii_encodable_text(min_num_lines, max_num_lines):
"""A Hypothesis strategy to produce a multiline Unicode string.
Args:
min_num_lines: The minimum number of lines in the produced strings.
max_num_lines: The maximum number of lines in the produced strings.
Returns:
A strategy for generating Unicode strings containing only newlines
and characters which are encodable as printable 7-bit ASCII characters.
"""
return integers(min_num_lines, max_num_lines) \
.flatmap(lambda n: lists(integers(*PRINTABLE_ASCII_RANGE), min_size=n, max_size=n)) \
.map(lambda xs: '\n'.join(bytes(x).decode('ascii') for x in xs))
示例26
def spaced_ranges(min_num_ranges, max_num_ranges, min_interval, max_interval):
"""A Hypothesis strategy to produce separated, non-overlapping ranges.
Args:
min_num_ranges: The minimum number of ranges to produce. TODO: Correct?
max_num_ranges: The maximum number of ranges to produce.
min_interval: The minimum interval used for the lengths of the alternating ranges and spaces.
max_interval: The maximum interval used for the lengths of the alternating ranges and spaces.
"""
return integers(min_num_ranges, max_num_ranges) \
.map(lambda n: 2*n) \
.flatmap(lambda n: lists(integers(min_interval, max_interval), min_size=n, max_size=n)) \
.map(list).map(lambda lst: list(accumulate(lst))) \
.map(lambda lst: list(batched(lst, 2))) \
.map(lambda pairs: list(starmap(range, pairs)))
示例27
def items2d(draw, i_size, j_size):
i_range = draw(ranges(min_size=1, max_size=i_size, min_step_value=1))
j_range = draw(ranges(min_size=1, max_size=j_size, min_step_value=1))
size = len(i_range) * len(j_range)
values = draw(lists(integers(), min_size=size, max_size=size))
items = {(i, j): v for (i, j), v in zip(product(i_range, j_range), values)}
return Items2D(i_range, j_range, items)
示例28
def sequences(draw, elements=None, min_size=None, max_size=None, average_size=None, unique_by=None, unique=False):
"""A Hypthesis strategy to produce arbitrary sequences.
Currently produces list, tuple or deque objects.
"""
seq_type = draw(sampled_from(SEQUENCE_TYPES)) # Work ranges and string and bytes in here
elements = integers() if elements is None else elements
items = draw(lists(elements=elements, min_size=min_size, max_size=max_size,
average_size=average_size, unique_by=unique_by, unique=unique))
return seq_type(items)
示例29
def test_truncated_read_binary_values_raises_eof_error(self, seg_y_type, num_items, endian, data):
assume(seg_y_type != SegYType.IBM)
ctype = SEG_Y_TYPE_TO_CTYPE[seg_y_type]
c_format = ''.join(map(str, (endian, num_items, ctype)))
py_type = PY_TYPES[seg_y_type]
min_value, max_value = LIMITS[seg_y_type]
items_written = data.draw(st.lists(
elements=NUMBER_STRATEGY[py_type](min_value=min_value, max_value=max_value),
min_size=num_items, max_size=num_items))
buffer = struct.pack(c_format, *items_written)
truncate_pos = data.draw(st.integers(min_value=0, max_value=max(0, len(buffer) - 1)))
truncated_buffer = buffer[:truncate_pos]
with BytesIO(truncated_buffer) as fh:
with raises(EOFError):
toolkit.read_binary_values(fh, 0, seg_y_type, num_items, endian)
示例30
def test_pack_values(self, seg_y_type, num_items, endian, data):
assume(seg_y_type not in {SegYType.FLOAT32, SegYType.IBM})
c_type = SEG_Y_TYPE_TO_CTYPE[seg_y_type]
c_format = ''.join(map(str, (endian, num_items, c_type)))
py_type = PY_TYPES[seg_y_type]
min_value, max_value = LIMITS[seg_y_type]
items_written = tuple(data.draw(st.lists(
elements=NUMBER_STRATEGY[py_type](min_value=min_value, max_value=max_value),
min_size=num_items, max_size=num_items)))
buffer = toolkit.pack_values(items_written, c_type, endian)
items_read = struct.unpack(c_format, buffer)
assert items_written == items_read