Python源码示例:hypothesis.strategies.integers()
示例1
def arguments_node(draw, annotated=False):
n = draw(hs.integers(min_value=1, max_value=5))
args = draw(hs.lists(name_node(None), min_size=n, max_size=n))
if annotated:
annotations = draw(hs.lists(name_node(annotation), min_size=n, max_size=n))
else:
annotations = None
node = astroid.Arguments()
node.postinit(
args,
None,
None,
None,
annotations
)
return node
示例2
def _strategy_2d_array(dtype, minval=0, maxval=None, **kwargs):
if 'min_side' in kwargs:
min_side = kwargs.pop('min_side')
else:
min_side = 1
if 'max_side' in kwargs:
max_side = kwargs.pop('max_side')
else:
max_side = None
if dtype is np.int:
elems = st.integers(minval, maxval, **kwargs)
elif dtype is np.float:
elems = st.floats(minval, maxval, **kwargs)
elif dtype is np.str:
elems = st.text(min_size=minval, max_size=maxval, **kwargs)
else:
raise ValueError('no elements strategy for dtype', dtype)
return arrays(dtype, array_shapes(2, 2, min_side, max_side), elements=elems)
示例3
def bits( nbits, signed=False, min_value=None, max_value=None ):
BitsN = mk_bits( nbits )
if (min_value is not None or max_value is not None) and signed:
raise ValueError("bits strategy currently doesn't support setting "
"signedness and min/max value at the same time")
if min_value is None:
min_value = (-(2**(nbits-1))) if signed else 0
if max_value is None:
max_value = (2**(nbits-1)-1) if signed else (2**nbits - 1)
strat = st.booleans() if nbits == 1 else st.integers( min_value, max_value )
@st.composite
def strategy_bits( draw ):
return BitsN( draw( strat ) )
return strategy_bits() # RETURN A STRATEGY INSTEAD OF FUNCTION
#-------------------------------------------------------------------------
# strategies.bitslists
#-------------------------------------------------------------------------
# Return the SearchStrategy for a list of Bits with the support of
# dictionary based min/max value limit
示例4
def same_len_lists(draw, min_value = None, max_value = None):
"""Draw random arrays of equal lengths
One precondition of the list version of spherical() is that its inputs must
be of equal length.
"""
n = draw(integers(min_value = 0, max_value = 50))
fixlen = lists(
floats(
min_value = min_value,
max_value = max_value,
allow_nan = False,
allow_infinity = False
),
min_size = n,
max_size = n,
)
fixnp = fixlen.map(np.array)
return (draw(fixnp), draw(fixnp), draw(fixnp))
示例5
def chromeResponseReceived (reqid, url):
mimeTypeSt = st.one_of (st.none (), st.just ('text/html'))
remoteIpAddressSt = st.one_of (st.none (), st.just ('127.0.0.1'))
protocolSt = st.one_of (st.none (), st.just ('h2'))
statusCodeSt = st.integers (min_value=100, max_value=999)
typeSt = st.sampled_from (['Document', 'Stylesheet', 'Image', 'Media',
'Font', 'Script', 'TextTrack', 'XHR', 'Fetch', 'EventSource',
'WebSocket', 'Manifest', 'SignedExchange', 'Ping',
'CSPViolationReport', 'Other'])
return st.fixed_dictionaries ({
'requestId': reqid,
'timestamp': timestamp,
'type': typeSt,
'response': st.fixed_dictionaries ({
'url': url,
'requestHeaders': chromeHeaders (), # XXX: make this optional
'headers': chromeHeaders (),
'status': statusCodeSt,
'statusText': asciiText,
'mimeType': mimeTypeSt,
'remoteIPAddress': remoteIpAddressSt,
'protocol': protocolSt,
})
})
示例6
def test_padding(ndim: int, data: st.DataObject):
"""Ensure that convolving a padding-only image with a commensurate kernel yields the single entry: 0"""
padding = data.draw(
st.integers(1, 3) | st.tuples(*[st.integers(1, 3)] * ndim), label="padding"
)
x = Tensor(
data.draw(
hnp.arrays(shape=(1, 1) + (0,) * ndim, dtype=float, elements=st.floats()),
label="x",
)
)
pad_tuple = padding if isinstance(padding, tuple) else (padding,) * ndim
kernel = data.draw(
hnp.arrays(
shape=(1, 1) + tuple(2 * p for p in pad_tuple),
dtype=float,
elements=st.floats(allow_nan=False, allow_infinity=False),
)
)
out = conv_nd(x, kernel, padding=padding, stride=1)
assert out.shape == (1,) * x.ndim
assert out.item() == 0.0
out.sum().backward()
assert x.grad.shape == x.shape
示例7
def test_negative_log_likelihood(data: st.DataObject, labels_as_tensor: bool):
s = data.draw(
hnp.arrays(
shape=hnp.array_shapes(max_side=10, min_dims=2, max_dims=2),
dtype=float,
elements=st.floats(-100, 100),
)
)
y_true = data.draw(
hnp.arrays(
shape=(s.shape[0],),
dtype=hnp.integer_dtypes(),
elements=st.integers(min_value=0, max_value=s.shape[1] - 1),
).map(Tensor if labels_as_tensor else lambda x: x)
)
scores = Tensor(s)
nll = negative_log_likelihood(mg.log(mg.nnet.softmax(scores)), y_true)
nll.backward()
cross_entropy_scores = Tensor(s)
ce = softmax_crossentropy(cross_entropy_scores, y_true)
ce.backward()
assert_allclose(nll.data, ce.data, atol=1e-5, rtol=1e-5)
assert_allclose(scores.grad, cross_entropy_scores.grad, atol=1e-5, rtol=1e-5)
示例8
def gen_roll_args(draw, arr):
shift = draw(st.integers() | st.tuples(*(st.integers() for i in arr.shape)))
if arr.ndim:
ax_strat = hnp.valid_tuple_axes(
arr.ndim,
**(
dict(min_size=len(shift), max_size=len(shift))
if isinstance(shift, tuple)
else {}
)
)
axis = draw(st.none() | st.integers(-arr.ndim, arr.ndim - 1) | ax_strat)
else:
axis = None
return dict(shift=shift, axis=axis)
示例9
def test_prefix_complex(data, stuff):
pattern, statements = stuff
quantity = data.draw(integers(min_value=0, max_value=max(0, len(statements) - 1)))
statements = statements[:quantity]
works, unhandled = matches(statements, pattern)
assert works in [partial, success]
示例10
def string_and_substring(draw):
x = draw(text(min_size=2, alphabet=string.printable))
i = draw(integers(min_value=0, max_value=len(x)-2))
j = draw(integers(min_value=i+1, max_value=len(x)-1))
return (x, x[i:j])
示例11
def string_and_not_substring(draw):
x = draw(text(min_size=2, alphabet=string.printable))
i = draw(integers(min_value=1, max_value=len(x)-1))
y = ''.join(sample(x, i))
assume(x.find(y) == -1)
return (x, y)
示例12
def bipartite_graph(draw):
m = draw(st.integers(min_value=1, max_value=4))
n = draw(st.integers(min_value=m, max_value=5))
graph = BipartiteGraph()
for i in range(n):
for j in range(m):
b = draw(st.booleans())
if b:
graph[i, j] = b
return graph
示例13
def sequence_vars(draw):
num_vars = draw(st.integers(min_value=1, max_value=4))
variables = []
for i in range(num_vars):
name = 'var{:d}'.format(i)
count = draw(st.integers(min_value=1, max_value=4))
minimum = draw(st.integers(min_value=0, max_value=2))
variables.append(VariableWithCount(name, count, minimum, None))
return variables
示例14
def _emit_control(self, data, hub, hub_stop_evt, ble, sensor):
async def dummy():
pass
system = await spawn(bricknil.bricknil._run_all(ble, dummy))
while not hub.peripheral_queue:
await sleep(0.1)
#await sleep(3)
port = data.draw(st.integers(0,254))
await hub.peripheral_queue.put( ('attach', (port, sensor.sensor_name)) )
# Now, make sure the sensor sent an activate updates message
if sensor.sensor_name == "Button":
await self._wait_send_message(sensor.send_message, 'Activate button')
else:
await self._wait_send_message(sensor.send_message, 'Activate SENSOR')
# Need to generate a value on the port
# if False:
msg = []
if len(sensor.capabilities) == 1:
# Handle single capability
for cap in sensor.capabilities:
n_datasets, byte_count = sensor.datasets[cap][0:2]
for i in range(n_datasets):
for b in range(byte_count):
msg.append(data.draw(st.integers(0,255)))
msg = bytearray(msg)
await hub.peripheral_queue.put( ('value_change', (port, msg)))
elif len(sensor.capabilities) > 1:
modes = 1
msg.append(modes)
for cap_i, cap in enumerate(sensor.capabilities):
if modes & (1<<cap_i):
n_datasets, byte_count = sensor.datasets[cap][0:2]
for i in range(n_datasets):
for b in range(byte_count):
msg.append(data.draw(st.integers(0,255)))
msg = bytearray(msg)
await hub.peripheral_queue.put( ('value_change', (port, msg)))
await hub_stop_evt.set()
await system.join()
示例15
def test_port_value_message(self, data):
port = data.draw(st.integers(0,255))
width = data.draw(st.integers(1,3))
nbytes = 1<<(width-1)
values = data.draw(st.lists(st.integers(0,255),min_size=nbytes,max_size=nbytes ))
msg_type = 0x45
msg = bytearray([msg_type, port]+values)
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_called_with(('value_change', (port,values)))
示例16
def test_attach_message(self, data, port, event):
msg_type = 0x04
msg = bytearray([msg_type, port, event])
if event == 0: #detach
l = self.m.parse(self._with_header(msg))
assert l == f'Detached IO Port:{port}'
elif event == 1: #attach
# Need 10 bytes
#dev_id = data.draw(st.integers(0,255))
dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
fw_version = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8))
msg = msg + bytearray([dev_id, 0])+ bytearray(fw_version)
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
# ALso need to make sure the port info is added to dispatch
assert self.m.port_info[port]['name'] == DEVICES[dev_id]
elif event == 2: # virtual attach
dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
v_port_a = data.draw(st.integers(0,255))
v_port_b = data.draw(st.integers(0,255))
msg = msg + bytearray([dev_id, 0, v_port_a, v_port_b])
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
assert l == f'Attached VirtualIO Port:{port} {self.m.port_info[port]["name"]} Port A: {v_port_a}, Port B: {v_port_b}'
assert self.m.port_info[port]['virtual'] == (v_port_a, v_port_b)
assert self.m.port_info[port]['name'] == DEVICES[dev_id]
示例17
def test_port_information_message(self, data, port, mode):
msg_type = 0x43
if mode == 1:
capabilities = data.draw(st.integers(0,15)) # bit mask of 4 bits
nmodes = data.draw(st.integers(0,255))
input_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))]
output_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))]
msg = bytearray([msg_type, port, mode, capabilities, nmodes]+input_modes+output_modes)
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_info_received', port))
# Make sure the proper capabilities have been set
bitmask = ['output', 'input', 'combinable', 'synchronizable'] # capabilities
for i,cap in enumerate(bitmask):
assert self.m.port_info[port][cap] == capabilities & (1<<i)
for i in range(8):
if input_modes[0] & (1<<i):
assert self.m.port_info[port]['modes'][i]['input']
if input_modes[1] & (1<<i):
assert self.m.port_info[port]['modes'][i+8]['input']
if output_modes[0] & (1<<i):
assert self.m.port_info[port]['modes'][i]['output']
if output_modes[1] & (1<<i):
assert self.m.port_info[port]['modes'][i+8]['output']
elif mode == 2:
# Combination info
# Up to 8x 16-bit words (bitmasks) of combinations possible
ncombos = data.draw(st.integers(0,6)) # how many combos should we allow
combos = data.draw(st.lists(st.integers(0,255), min_size=ncombos*2, max_size=ncombos*2))
msg = bytearray([msg_type, port, mode]+combos+[0,0])
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_combination_info_received', port))
# Assert number of combos
#assert len(combos)/2 == len(self.m.port_info[port]['mode_combinations'])
示例18
def test_port_mode_info_message(self, port, mode, mode_type, data):
msg_type = 0x44
if mode_type == 0:
name = data.draw(st.text(min_size=1, max_size=11))
payload = bytearray(name.encode('utf-8'))
elif mode_type == 1 or mode_type == 2 or mode_type==3:
payload = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8))
payload = bytearray(payload)
elif mode_type == 4:
name = data.draw(st.text(min_size=1, max_size=5))
payload = bytearray(name.encode('utf-8'))
elif mode_type == 5:
payload = data.draw(st.lists(st.integers(0,255), min_size=2, max_size=2))
payload = bytearray(payload)
elif mode_type == 0x80:
ndatasets = data.draw(st.integers(0,255))
dataset_type = data.draw(st.integers(0,3))
total_figures = data.draw(st.integers(0,255))
decimals = data.draw(st.integers(0,255))
payload = bytearray([ndatasets, dataset_type, total_figures, decimals])
pass
else:
assert False
msg = bytearray([msg_type, port, mode, mode_type]) + payload
self.m.parse(self._with_header(msg))
示例19
def _integer_strategy(
type_str: str, min_value: Optional[int] = None, max_value: Optional[int] = None
) -> SearchStrategy:
min_value, max_value = _check_numeric_bounds(type_str, min_value, max_value, Wei)
return st.integers(min_value=min_value, max_value=max_value)
示例20
def test_tick_add_sub(cls, n, m):
# For all Tick subclasses and all integers n, m, we should have
# tick(n) + tick(m) == tick(n+m)
# tick(n) - tick(m) == tick(n-m)
left = cls(n)
right = cls(m)
expected = cls(n + m)
assert left + right == expected
assert left.apply(right) == expected
expected = cls(n - m)
assert left - right == expected
示例21
def chunked(draw, source):
data = draw(source)
chunk_sizes = [0]
chunk_sizes += draw(
st.lists(st.integers(0, len(data) - 1), unique=True).map(sorted)
)
chunk_sizes += [len(data)]
return [data[u:v] for u, v in zip(chunk_sizes, chunk_sizes[1:])]
示例22
def test_index_not_found_raises_value_error(seq, data):
missing = data.draw(integers())
assume(missing not in seq)
rev = ReversedSequenceView(seq)
with raises(ValueError):
rev.index(missing)
示例23
def byte_arrays_of_floats(draw):
num_items = draw(st.integers(min_value=0, max_value=10))
floats = draw(st.lists(ibm_compatible_floats(),
min_size=num_items,
max_size=num_items))
byte_data = b''.join([ieee2ibm(f) for f in floats])
return (byte_data, num_items)
示例24
def multiline_ascii_encodable_text(min_num_lines, max_num_lines):
"""A Hypothesis strategy to produce a multiline Unicode string.
Args:
min_num_lines: The minimum number of lines in the produced strings.
max_num_lines: The maximum number of lines in the produced strings.
Returns:
A strategy for generating Unicode strings containing only newlines
and characters which are encodable as printable 7-bit ASCII characters.
"""
return integers(min_num_lines, max_num_lines) \
.flatmap(lambda n: lists(integers(*PRINTABLE_ASCII_RANGE), min_size=n, max_size=n)) \
.map(lambda xs: '\n'.join(bytes(x).decode('ascii') for x in xs))
示例25
def spaced_ranges(min_num_ranges, max_num_ranges, min_interval, max_interval):
"""A Hypothesis strategy to produce separated, non-overlapping ranges.
Args:
min_num_ranges: The minimum number of ranges to produce. TODO: Correct?
max_num_ranges: The maximum number of ranges to produce.
min_interval: The minimum interval used for the lengths of the alternating ranges and spaces.
max_interval: The maximum interval used for the lengths of the alternating ranges and spaces.
"""
return integers(min_num_ranges, max_num_ranges) \
.map(lambda n: 2*n) \
.flatmap(lambda n: lists(integers(min_interval, max_interval), min_size=n, max_size=n)) \
.map(list).map(lambda lst: list(accumulate(lst))) \
.map(lambda lst: list(batched(lst, 2))) \
.map(lambda pairs: list(starmap(range, pairs)))
示例26
def header(header_class, **kwargs):
"""Create a strategy for producing headers of a specific class.
Args:
header_class: The type of header to be produced. This class will be
introspected to determine suitable strategies for each named
field.
**kwargs: Any supplied keyword arguments can be used to fix the value
of particular header fields.
"""
field_strategies = {}
for field_name in header_class.ordered_field_names():
if field_name in kwargs:
field_strategy = just(kwargs.pop(field_name))
else:
value_type = getattr(header_class, field_name).value_type
if hasattr(value_type, 'ENUM'):
field_strategy = sampled_from(sorted(value_type.ENUM))
else:
field_strategy = integers(value_type.MINIMUM, value_type.MAXIMUM)
field_strategies[field_name] = field_strategy
if len(kwargs) > 0:
raise TypeError("Unrecognised binary header field names {} for {}".format(
', '.join(kwargs.keys()),
header_class.__name__))
return fixed_dictionaries(field_strategies) \
.map(lambda kw: header_class(**kw))
示例27
def items2d(draw, i_size, j_size):
i_range = draw(ranges(min_size=1, max_size=i_size, min_step_value=1))
j_range = draw(ranges(min_size=1, max_size=j_size, min_step_value=1))
size = len(i_range) * len(j_range)
values = draw(lists(integers(), min_size=size, max_size=size))
items = {(i, j): v for (i, j), v in zip(product(i_range, j_range), values)}
return Items2D(i_range, j_range, items)
示例28
def sequences(draw, elements=None, min_size=None, max_size=None, average_size=None, unique_by=None, unique=False):
"""A Hypthesis strategy to produce arbitrary sequences.
Currently produces list, tuple or deque objects.
"""
seq_type = draw(sampled_from(SEQUENCE_TYPES)) # Work ranges and string and bytes in here
elements = integers() if elements is None else elements
items = draw(lists(elements=elements, min_size=min_size, max_size=max_size,
average_size=average_size, unique_by=unique_by, unique=unique))
return seq_type(items)
示例29
def test_invalid_bytes_per_sample_field_raises_value_error(self, bad_dsf):
assume(all(bad_dsf != f for f in DataSampleFormat)) # IntEnum doesn't support the in operator for integers
fake_header = SimpleNamespace()
fake_header.data_sample_format = bad_dsf
with raises(ValueError):
toolkit.bytes_per_sample(fake_header)
示例30
def test_truncated_read_binary_values_raises_eof_error(self, seg_y_type, num_items, endian, data):
assume(seg_y_type != SegYType.IBM)
ctype = SEG_Y_TYPE_TO_CTYPE[seg_y_type]
c_format = ''.join(map(str, (endian, num_items, ctype)))
py_type = PY_TYPES[seg_y_type]
min_value, max_value = LIMITS[seg_y_type]
items_written = data.draw(st.lists(
elements=NUMBER_STRATEGY[py_type](min_value=min_value, max_value=max_value),
min_size=num_items, max_size=num_items))
buffer = struct.pack(c_format, *items_written)
truncate_pos = data.draw(st.integers(min_value=0, max_value=max(0, len(buffer) - 1)))
truncated_buffer = buffer[:truncate_pos]
with BytesIO(truncated_buffer) as fh:
with raises(EOFError):
toolkit.read_binary_values(fh, 0, seg_y_type, num_items, endian)