Python源码示例:hypothesis.strategies.sampled_from()
示例1
def test_run_hub(self, data):
Hub.hubs = []
sensor_name = 'sensor'
sensor = data.draw(st.sampled_from(self.sensor_list))
capabilities = self._draw_capabilities(data, sensor)
hub_type = data.draw(st.sampled_from(self.hub_list))
TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
hub = TestHub('test_hub')
# Start the hub
#kernel.run(self._emit_control(TestHub))
with patch('Adafruit_BluefruitLE.get_provider') as ble,\
patch('bricknil.ble_queue.USE_BLEAK', False) as use_bleak:
ble.return_value = MockBLE(hub)
sensor_obj = getattr(hub, sensor_name)
sensor_obj.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this"))
kernel.run(self._emit_control, data, hub, stop_evt, ble(), sensor_obj)
#start(system)
示例2
def remote_init_st(draw):
"""Hypothesis strategy to generate terraform remote init state."""
be_type = draw(st.sampled_from(['s3']))
ri_dict = {
"version": 3,
"serial": 0,
"lineage": draw(lineage_st()),
"backend": {
"type": be_type,
"config": draw(get_be_config_st(be_type)()),
"hash": draw(st.text(alphabet=list(digits), min_size=18, max_size=18))
},
"modules": [
{
"path": [
"root"
],
"outputs": {},
"resources": {},
"depends_on": []
}
]
}
return ri_dict
示例3
def functiondef_node(draw, name=None, annotated=False, returns=False):
name = name or draw(valid_identifier())
args = draw(arguments_node(annotated))
body = []
returns_node = astroid.Return()
arg_node, arg_type_node = draw(hs.sampled_from(list(zip(args.args, args.annotations))))
if returns:
returns_node.postinit(arg_node)
else:
returns_node.postinit(const_node(None))
body.append(returns_node)
node = astroid.FunctionDef(name=name)
node.parent = astroid.Module('Default', None)
node.postinit(
args,
body,
None,
arg_type_node
)
return node
示例4
def tagged_union_strategy(draw, type_, attr_strategies):
"""
Create a strategy for building a type with a ``TaggedUnionInvariant``.
:param type_: Type to generate a strategy for.
:param attr_strategies: Mapping of attributes to strategies to
generate corresponding attributes.
:type attr_strategies: ``dict`` mapping ``str`` to ``SearchStrategy`s.
"""
invariant = type_.__invariant__
tag = draw(sampled_from(invariant._allowed_tags))
attributes = {invariant.tag_attribute: tag}
for name, strategy in attr_strategies.items():
if (
name in invariant.attributes_for_tag[tag]
or name not in invariant._all_attributes
):
attributes[name] = draw(strategy)
return type_(**attributes)
示例5
def chromeResponseReceived (reqid, url):
mimeTypeSt = st.one_of (st.none (), st.just ('text/html'))
remoteIpAddressSt = st.one_of (st.none (), st.just ('127.0.0.1'))
protocolSt = st.one_of (st.none (), st.just ('h2'))
statusCodeSt = st.integers (min_value=100, max_value=999)
typeSt = st.sampled_from (['Document', 'Stylesheet', 'Image', 'Media',
'Font', 'Script', 'TextTrack', 'XHR', 'Fetch', 'EventSource',
'WebSocket', 'Manifest', 'SignedExchange', 'Ping',
'CSPViolationReport', 'Other'])
return st.fixed_dictionaries ({
'requestId': reqid,
'timestamp': timestamp,
'type': typeSt,
'response': st.fixed_dictionaries ({
'url': url,
'requestHeaders': chromeHeaders (), # XXX: make this optional
'headers': chromeHeaders (),
'status': statusCodeSt,
'statusText': asciiText,
'mimeType': mimeTypeSt,
'remoteIPAddress': remoteIpAddressSt,
'protocol': protocolSt,
})
})
示例6
def numpy_video(
draw,
min_length=1,
max_length=3,
min_width=1,
max_width=10,
min_height=1,
max_height=10,
mode=None,
):
length, height, width = draw(
video_shape(
min_length, max_length, min_height, max_height, min_width, max_width
)
)
if mode is None:
mode = draw(st.sampled_from(["RGB", "L"]))
if mode == "RGB":
array_st = arrays(dtype=np.uint8, shape=(length, width, height, 3))
else:
array_st = arrays(dtype=np.uint8, shape=(length, width, height))
return draw(array_st)
示例7
def ds_vars_dims_mixed():
def build_it(vars_to_dims_):
all_dims = list(set(sum((list(dd) for dd in vars_to_dims_.values()), [])))
ds = fixed_datasets(vars_to_dims_)
dims = subset_lists(all_dims)
vars_ = sampled_from(list(vars_to_dims_.keys()))
vars_dict = dictionaries(vars_, dims, dict_class=OrderedDict)
vars_dict = vars_dict.map(OrderedDict.items).map(list)
return tuples(ds, vars_dict, just(all_dims))
vars_to_dims_st = vars_to_dims_dicts(min_vars=0, min_dims=0)
S = vars_to_dims_st.flatmap(build_it)
return S
示例8
def test_change(self, C, data):
"""
Changes work.
"""
# Take the first attribute, and change it.
assume(fields(C)) # Skip classes with no attributes.
field_names = [a.name for a in fields(C)]
original = C()
chosen_names = data.draw(st.sets(st.sampled_from(field_names)))
change_dict = {name: data.draw(st.integers()) for name in chosen_names}
with pytest.deprecated_call():
changed = assoc(original, **change_dict)
for k, v in change_dict.items():
assert getattr(changed, k) == v
示例9
def test_change(self, C, data):
"""
Changes work.
"""
# Take the first attribute, and change it.
assume(fields(C)) # Skip classes with no attributes.
field_names = [a.name for a in fields(C)]
original = C()
chosen_names = data.draw(st.sets(st.sampled_from(field_names)))
# We pay special attention to private attributes, they should behave
# like in `__init__`.
change_dict = {
name.replace("_", ""): data.draw(st.integers())
for name in chosen_names
}
changed = evolve(original, **change_dict)
for name in chosen_names:
assert getattr(changed, name) == change_dict[name.replace("_", "")]
示例10
def urls():
"""
Strategy for generating urls.
"""
return st.builds(
parsed_url,
scheme=st.sampled_from(uri_schemes),
netloc=dns_names(),
path=st.lists(
st.text(
max_size=64,
alphabet=st.characters(
blacklist_characters="/?#", blacklist_categories=("Cs",)
),
),
min_size=1,
max_size=10,
)
.map(to_text)
.map("".join),
)
示例11
def auth_url_strategy():
# taken from the hypothesis provisional url generation strategy
def url_encode(s):
return "".join(c if c in URL_SAFE_CHARACTERS else "%%%02X" % ord(c) for c in s)
schemes = ["{0}://".format(scheme) for scheme in uri_schemes if scheme != "file"]
schemes.append("file:///")
return st.builds(
AuthUrl,
scheme=st.sampled_from(schemes),
auth=auth_strings()
.filter(lambda x: x != ":")
.map(lambda x: "" if not x else "{0}@".format(x)),
domain=domains().filter(lambda x: x != "").map(lambda x: x.lower()),
port=st.integers(min_value=0, max_value=65535),
path=st.lists(
st.text(string.printable)
.map(url_encode)
.filter(lambda x: x not in ["", ".", ".."])
).map("/".join),
)
示例12
def test_structure_simple_from_dict_default(converter, cl_and_vals, data):
"""Test structuring non-nested attrs classes with default value."""
cl, vals = cl_and_vals
obj = cl(*vals)
attrs_with_defaults = [a for a in fields(cl) if a.default is not NOTHING]
to_remove = data.draw(
lists(elements=sampled_from(attrs_with_defaults), unique=True)
)
for a in to_remove:
if isinstance(a.default, Factory):
setattr(obj, a.name, a.default.factory())
else:
setattr(obj, a.name, a.default)
dumped = asdict(obj)
for a in to_remove:
del dumped[a.name]
assert obj == converter.structure(dumped, cl)
示例13
def gen_string(draw: Any) -> Dict[str, Union[str, int]]:
"""Draw a string schema."""
min_size = draw(st.none() | st.integers(0, 10))
max_size = draw(st.none() | st.integers(0, 1000))
if min_size is not None and max_size is not None and min_size > max_size:
min_size, max_size = max_size, min_size
pattern = draw(st.none() | REGEX_PATTERNS)
format_ = draw(st.none() | st.sampled_from(sorted(STRING_FORMATS)))
out: Dict[str, Union[str, int]] = {"type": "string"}
if pattern is not None:
out["pattern"] = pattern
elif format_ is not None:
out["format"] = format_
if min_size is not None:
out["minLength"] = min_size
if max_size is not None:
out["maxLength"] = max_size
return out
示例14
def _draw_capabilities(self, data, sensor):
if len(sensor.allowed_combo) > 0:
# test capabilities 1 by 1,
# or some combination of those in the allowed_combo list
capabilities = data.draw(
st.one_of(
st.lists(st.sampled_from([cap.name for cap in list(sensor.capability)]), min_size=1, max_size=1),
st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1),
st.lists(st.sampled_from(sensor.allowed_combo), min_size=1, unique=True)
)
)
else:
# if no combos allowed, then just test 1 by 1
capabilities = data.draw(st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1))
return capabilities
示例15
def test_attach_sensor(self, data):
sensor_name = 'sensor'
sensor = data.draw(st.sampled_from(self.sensor_list))
capabilities = self._draw_capabilities(data, sensor)
hub_type = data.draw(st.sampled_from(self.hub_list))
TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
hub = TestHub('testhub')
# Check to make sure we have the peripheral attached
# and the sensor inserted as an attribute
assert sensor_name in hub.peripherals
assert hasattr(hub, sensor_name)
示例16
def test_attach_message(self, data, port, event):
msg_type = 0x04
msg = bytearray([msg_type, port, event])
if event == 0: #detach
l = self.m.parse(self._with_header(msg))
assert l == f'Detached IO Port:{port}'
elif event == 1: #attach
# Need 10 bytes
#dev_id = data.draw(st.integers(0,255))
dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
fw_version = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8))
msg = msg + bytearray([dev_id, 0])+ bytearray(fw_version)
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
# ALso need to make sure the port info is added to dispatch
assert self.m.port_info[port]['name'] == DEVICES[dev_id]
elif event == 2: # virtual attach
dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
v_port_a = data.draw(st.integers(0,255))
v_port_b = data.draw(st.integers(0,255))
msg = msg + bytearray([dev_id, 0, v_port_a, v_port_b])
l = self.m.parse(self._with_header(msg))
self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
assert l == f'Attached VirtualIO Port:{port} {self.m.port_info[port]["name"]} Port A: {v_port_a}, Port B: {v_port_b}'
assert self.m.port_info[port]['virtual'] == (v_port_a, v_port_b)
assert self.m.port_info[port]['name'] == DEVICES[dev_id]
示例17
def __repr__(self):
return f"sampled_from({self._repr_target})"
示例18
def _address_strategy(length: Optional[int] = None) -> SearchStrategy:
return _DeferredStrategyRepr(
lambda: st.sampled_from(list(network.accounts)[:length]), "accounts"
)
示例19
def contract_strategy(contract_name: str) -> SearchStrategy:
def _contract_deferred(name):
for proj in project.get_loaded_projects():
if name in proj.dict():
return st.sampled_from(list(proj[name]))
raise NameError(f"Contract '{name}' does not exist in any active projects")
return _DeferredStrategyRepr(lambda: _contract_deferred(contract_name), contract_name)
示例20
def test_is_valid_query_strategy():
strategy = strategies.sampled_from([{"key": "1"}, {"key": "\udcff"}]).filter(is_valid_query)
@given(strategy)
@settings(max_examples=10)
def test(value):
assert value == {"key": "1"}
test()
示例21
def csv_strategy(enum):
return st.lists(st.sampled_from([item.name for item in enum]), min_size=1).map(",".join)
# The following strategies generate CLI parameters, for example "--workers=5" or "--exitfirst"
示例22
def header(header_class, **kwargs):
"""Create a strategy for producing headers of a specific class.
Args:
header_class: The type of header to be produced. This class will be
introspected to determine suitable strategies for each named
field.
**kwargs: Any supplied keyword arguments can be used to fix the value
of particular header fields.
"""
field_strategies = {}
for field_name in header_class.ordered_field_names():
if field_name in kwargs:
field_strategy = just(kwargs.pop(field_name))
else:
value_type = getattr(header_class, field_name).value_type
if hasattr(value_type, 'ENUM'):
field_strategy = sampled_from(sorted(value_type.ENUM))
else:
field_strategy = integers(value_type.MINIMUM, value_type.MAXIMUM)
field_strategies[field_name] = field_strategy
if len(kwargs) > 0:
raise TypeError("Unrecognised binary header field names {} for {}".format(
', '.join(kwargs.keys()),
header_class.__name__))
return fixed_dictionaries(field_strategies) \
.map(lambda kw: header_class(**kw))
示例23
def sequences(draw, elements=None, min_size=None, max_size=None, average_size=None, unique_by=None, unique=False):
"""A Hypthesis strategy to produce arbitrary sequences.
Currently produces list, tuple or deque objects.
"""
seq_type = draw(sampled_from(SEQUENCE_TYPES)) # Work ranges and string and bytes in here
elements = integers() if elements is None else elements
items = draw(lists(elements=elements, min_size=min_size, max_size=max_size,
average_size=average_size, unique_by=unique_by, unique=unique))
return seq_type(items)
示例24
def s3_backend_config_st(draw):
"""Hypothesis strategy for s3 backend configuration."""
s3_be_dict = {
'bucket': draw(s3_bucket_name_st()),
'encrypt': draw(st.sampled_from(['true', 'false'])),
'key': draw(st.text(
alphabet=list(ascii_letters + digits + '!-_.*\'()/'),
min_size=1,
max_size=1024).filter(lambda x: x[0] not in '/')),
'region': draw(st.sampled_from(aws_region_list)) }
if bool(random.getrandbits(1)):
s3_be_dict['profile'] = 'testawsprofile'
return s3_be_dict
示例25
def remove_strategy(self):
def get_address(service_name):
return st.tuples(st.just(service_name), st.sampled_from(
self.fake.services[service_name]))
return st.tuples(st.just("remove"), (
st.sampled_from(list(self.fake.services.keys())).flatmap(get_address)))
示例26
def steps(self):
result = add_strategy | replace_strategy
# Replace or add to a known service cluster:
if self.fake.services:
result |= st.tuples(st.just("replace"),
st.tuples(st.sampled_from(list(self.fake.services.keys())),
st.lists(nice_strings)))
result |= st.tuples(st.just("add"),
st.tuples(st.sampled_from(list(self.fake.services.keys())),
nice_strings))
# Remove a known address from known cluster:
if not self.fake.is_empty():
result |= self.remove_strategy()
return result
示例27
def add_bools(list_of_lists):
"""
Given recursive list that can contain other lists, return tuple of that plus
a booleans strategy for each list.
"""
l = []
def count(recursive):
l.append(1)
for child in recursive:
if isinstance(child, list):
count(child)
count(list_of_lists)
return st.tuples(st.just(list_of_lists), st.tuples(*[st.sampled_from([True, False]) for i in l]))
示例28
def schema_strategy(draw):
schema = {"type": draw(st.sampled_from(SCHEMA_TYPES))}
# TODO: generate constraints on number/string/array schemas
# (hint: can you design this so they shrink to less constrained?)
return schema
示例29
def graphs(
draw,
keys=ascii_uppercase,
allow_self_links=True,
directed=True,
force_path=True,
edge_cost=False,
):
result = {c: set() for c in keys}
for i, c in enumerate(keys):
# The inner strategy for keys (node identifiers) is a sampled_from,
# rejecting the current node iff allow_self_links is False.
# Cost will always be 1, or 1-10 inclusive if edge_cost is True.
# Then we choose a set of neighbors for node c, and update the graph:
key_strat = st.sampled_from(keys).filter(lambda k: allow_self_links or k != c)
cost_strat = st.integers(1, 10) if edge_cost else st.just(1)
neighbors = draw(
st.sets(st.tuples(key_strat, cost_strat), min_size=1, max_size=4)
)
result[c].update(neighbors)
if not directed:
for k in neighbors:
result[k].add((c, 1))
if force_path:
# Ensure that there *is* always a path between any pair of nodes,
# by linking each node to the next in order. We therefore have a
# directed ring, plus some number of cross-links.
result[c].add((keys[i - 1], 1))
if not directed:
result[keys[i - 1]].add((c, 1))
return result
示例30
def test_bfs_finds_shortest_path(graph, data):
# Using label=... is easier to follow than the default "Draw #1" style!
start = data.draw(st.sampled_from(sorted(graph)), label="start")
end = data.draw(st.sampled_from(sorted(graph)), label="end")
path = breadth_first_search(graph, start, end)
midpoint = data.draw(st.sampled_from(path), label="midpoint")
# TODO: your code here!