Python源码示例:hypothesis.strategies.SearchStrategy()
示例1
def _fuzz_string(
parameter: Dict[str, Any],
required: bool = False,
) -> SearchStrategy:
if parameter.get('in', None) == 'header':
return st.text(
# According to RFC 7230, non-ascii letters are deprecated, and there's
# no telling what the server will do if they are sent. Since the intent
# is not to break the server, but to send valid requests instead, we're
# just going to limit it accordingly.
alphabet=string.ascii_letters,
)
# TODO: Handle a bunch of swagger string formats.
# https://swagger.io/docs/specification/data-models/data-types/#string
kwargs = {} # type: Dict[str, Any]
if parameter.get('required', required):
kwargs['min_size'] = 1
if not get_settings().unicode_enabled:
kwargs['alphabet'] = string.printable
return st.text(**kwargs)
示例2
def _exclude_filter(fn: Callable) -> Callable:
def wrapper(*args: Tuple, exclude: Any = None, **kwargs: int) -> SearchStrategy:
strat = fn(*args, **kwargs)
if exclude is None:
return strat
if callable(exclude):
return strat.filter(exclude)
if not isinstance(exclude, Iterable) or isinstance(exclude, str):
exclude = (exclude,)
strat = strat.filter(lambda k: k not in exclude)
# make the filter repr more readable
repr_ = strat.__repr__().rsplit(").filter", maxsplit=1)[0]
strat._cached_repr = f"{repr_}, exclude={exclude})"
return strat
return wrapper
示例3
def _array_strategy(
abi_type: BasicType,
min_length: ArrayLengthType = 1,
max_length: ArrayLengthType = 8,
unique: bool = False,
**kwargs: Any,
) -> SearchStrategy:
if abi_type.arrlist[-1]:
min_len = max_len = abi_type.arrlist[-1][0]
else:
dynamic_len = len([i for i in abi_type.arrlist if not i])
min_len = _get_array_length("min_length", min_length, dynamic_len)
max_len = _get_array_length("max_length", max_length, dynamic_len)
if abi_type.item_type.is_array:
kwargs.update(min_length=min_length, max_length=max_length, unique=unique)
base_strategy = strategy(abi_type.item_type.to_type_str(), **kwargs)
strat = st.lists(base_strategy, min_size=min_len, max_size=max_len, unique=unique)
# swap 'size' for 'length' in the repr
repr_ = "length".join(strat.__repr__().rsplit("size", maxsplit=2))
strat._LazyStrategy__representation = repr_ # type: ignore
return strat
示例4
def strategy(type_str: str, **kwargs: Any) -> SearchStrategy:
type_str = TYPE_STR_TRANSLATIONS.get(type_str, type_str)
if type_str == "fixed168x10":
return _decimal_strategy(**kwargs)
if type_str == "address":
return _address_strategy(**kwargs)
if type_str == "bool":
return st.booleans(**kwargs) # type: ignore
if type_str == "string":
return _string_strategy(**kwargs)
abi_type = parse(type_str)
if abi_type.is_array:
return _array_strategy(abi_type, **kwargs)
if isinstance(abi_type, TupleType):
return _tuple_strategy(abi_type, **kwargs) # type: ignore
base = abi_type.base
if base in ("int", "uint"):
return _integer_strategy(type_str, **kwargs)
if base == "bytes":
return _bytes_strategy(abi_type, **kwargs)
raise ValueError(f"No strategy available for type: {type_str}")
示例5
def get_single_example(strategy: st.SearchStrategy[Case]) -> Case:
@hypothesis.given(strategy) # type: ignore
@hypothesis.settings( # type: ignore
database=None,
max_examples=1,
deadline=None,
verbosity=hypothesis.Verbosity.quiet,
phases=(hypothesis.Phase.generate,),
suppress_health_check=hypothesis.HealthCheck.all(),
)
def example_generating_inner_function(ex: Case) -> None:
examples.append(ex)
examples: List[Case] = []
example_generating_inner_function()
return examples[0]
示例6
def _get_case_strategy(
endpoint: Endpoint,
extra_static_parameters: Dict[str, Any],
strategies: Dict[str, st.SearchStrategy],
hook_dispatcher: Optional[HookDispatcher] = None,
) -> st.SearchStrategy[Case]:
static_parameters: Dict[str, Any] = {"endpoint": endpoint, **extra_static_parameters}
if endpoint.schema.validate_schema and endpoint.method == "GET":
if endpoint.body is not None:
raise InvalidSchema("Body parameters are defined for GET request.")
static_parameters["body"] = None
strategies.pop("body", None)
context = HookContext(endpoint)
_apply_hooks(strategies, GLOBAL_HOOK_DISPATCHER, context)
_apply_hooks(strategies, endpoint.schema.hooks, context)
if hook_dispatcher is not None:
_apply_hooks(strategies, hook_dispatcher, context)
return st.builds(partial(Case, **static_parameters), **strategies)
示例7
def bits( nbits, signed=False, min_value=None, max_value=None ):
BitsN = mk_bits( nbits )
if (min_value is not None or max_value is not None) and signed:
raise ValueError("bits strategy currently doesn't support setting "
"signedness and min/max value at the same time")
if min_value is None:
min_value = (-(2**(nbits-1))) if signed else 0
if max_value is None:
max_value = (2**(nbits-1)-1) if signed else (2**nbits - 1)
strat = st.booleans() if nbits == 1 else st.integers( min_value, max_value )
@st.composite
def strategy_bits( draw ):
return BitsN( draw( strat ) )
return strategy_bits() # RETURN A STRATEGY INSTEAD OF FUNCTION
#-------------------------------------------------------------------------
# strategies.bitslists
#-------------------------------------------------------------------------
# Return the SearchStrategy for a list of Bits with the support of
# dictionary based min/max value limit
示例8
def _strategy_dispatch( T, limit ):
# User-specified search strategy, early exit
if isinstance( limit, st.SearchStrategy ):
return limit
# a list of types
if isinstance( T, list ):
return bitslists( T, limit )
# nested bitstruct
if is_bitstruct_class( T ):
return bitstructs( T, limit )
# bits field, "leaf node", directly use bits strategy
assert issubclass( T, Bits )
if limit is None:
return bits( T.nbits )
# bits field with range limit
assert isinstance( limit, range ), f"We only accept range as min/max value specifier, not {type(limit)}"
assert limit.step == 1, f"We only accept step=1 range, not {limit}."
assert limit.start < limit.stop, f"We only accept start < stop range, not {limit}"
return bits( T.nbits, False, limit.start, limit.stop-1 )
示例9
def __init__(
self,
operation_id: str,
tag: str = 'default',
**kwargs: Any,
) -> None:
"""
:param operation_id: unique identifier for each Swagger operation.
:param tag: this is how Swagger operations are grouped.
"""
self.tag = tag
self.operation_id = operation_id
self.fuzzed_input = kwargs # type: Optional[Dict[str, Any]]
if not self.fuzzed_input:
self.fuzzed_input = None
# This SearchStrategy should be generated with hypothesis' `fixed_dictionaries`,
# mapping keys to SearchStrategy.
self._fuzzed_input_factory = None # type: Optional[SearchStrategy]
示例10
def _fuzz_array(
parameter: Dict[str, Any],
required: bool = False,
) -> SearchStrategy:
item = parameter['items']
required = parameter.get('required', required)
# TODO: Handle `oneOf`
strategy = st.lists(
elements=_fuzz_parameter(item, required=required),
min_size=parameter.get(
'minItems',
0 if not required else 1,
),
max_size=parameter.get('maxItems', None),
)
if not required:
return st.one_of(st.none(), strategy)
return strategy
示例11
def _get_strategy_from_factory(
expected_type: str,
name: Optional[str] = None,
) -> Optional[SearchStrategy[Any]]:
if name not in get_user_defined_mapping():
return None
def type_cast() -> Any:
"""Use known types to cast output, if applicable."""
output = get_user_defined_mapping()[name]()
if output is None:
# NOTE: We don't currently support `nullable` values, so we use `None`
# as a proxy to exclude the parameter from the final dictionary.
return None
if expected_type == 'string':
return str(output)
elif expected_type == 'integer':
return int(output)
return output
return st.builds(type_cast)
示例12
def arrays(self, i: int) -> st.SearchStrategy:
"""
Hypothesis search strategy for drawing an array y to be passed to f(x, ..., y_i,...).
By default, y is drawn to have a shape that is broadcast-compatible with x.
Parameters
----------
i : int
The argument index-location of y in the signature of f.
Returns
-------
hypothesis.searchstrategy.SearchStrategy"""
return hnp.arrays(
shape=self.index_to_arr_shapes.get(i),
dtype=float,
elements=st.floats(*self.index_to_bnds.get(i, self.default_bnds)),
)
示例13
def arrays(self, i: int) -> st.SearchStrategy:
"""
Hypothesis search strategy for drawing an array y to be passed to f(x, ..., y_i,...).
By default, y is drawn to have a shape that is broadcast-compatible with x.
Parameters
----------
i : int
The argument index-location of y in the signature of f.
Returns
-------
hypothesis.searchstrategy.SearchStrategy"""
return hnp.arrays(
shape=self.index_to_arr_shapes.get(i),
dtype=float,
elements=self.elements_strategy(
*self.index_to_bnds.get(i, self.default_bnds)
),
unique=self.index_to_unique.get(i, False),
)
示例14
def test_choices(seq: List[int], replace: bool, data: st.SearchStrategy):
""" Ensures that the `choices` strategy:
- draws from the provided sequence
- respects input parameters"""
upper = len(seq) + 10 if replace and seq else len(seq)
size = data.draw(st.integers(0, upper), label="size")
chosen = data.draw(choices(seq, size=size, replace=replace), label="choices")
assert set(chosen) <= set(seq), (
"choices contains elements that do not " "belong to `seq`"
)
assert len(chosen) == size, "the number of choices does not match `size`"
if not replace and len(set(seq)) == len(seq):
unique_choices = sorted(set(chosen))
assert unique_choices == sorted(chosen), (
"`choices` with `replace=False` draws " "elements with replacement"
)
示例15
def test_basic_index(shape: Tuple[int, ...], data: st.SearchStrategy):
min_dim = data.draw(st.integers(0, len(shape) + 2), label="min_dim")
max_dim = data.draw(st.integers(min_dim, min_dim + len(shape)), label="max_dim")
index = data.draw(
basic_indices(shape=shape, min_dims=min_dim, max_dims=max_dim), label="index"
)
x = np.zeros(shape, dtype=int)
o = x[index] # raises if invalid index
note(f"`x[index]`: {o}")
if o.size and o.ndim > 0:
assert np.shares_memory(x, o), (
"The basic index should produce a " "view of the original array."
)
assert min_dim <= o.ndim <= max_dim, (
"The dimensionality input constraints " "were not obeyed"
)
示例16
def integer_index(size):
""" Generate a valid integer-index for an axis of a given size,
either a positive or negative value: [-size, size).
Examples from this strategy shrink towards 0.
Parameters
----------
size : int
Size of the axis for which the index is drawn
Returns
-------
hypothesis.searchstrategy.SearchStrategy[int]
"""
return st.integers(-size, size - 1)
示例17
def merged_as_strategies(schemas: List[Schema]) -> st.SearchStrategy[JSONType]:
assert schemas, "internal error: must pass at least one schema to merge"
if len(schemas) == 1:
return from_schema(schemas[0])
# Try to merge combinations of strategies.
strats = []
combined: Set[str] = set()
inputs = {encode_canonical_json(s): s for s in schemas}
for group in itertools.chain.from_iterable(
itertools.combinations(inputs, n) for n in range(len(inputs), 0, -1)
):
if combined.issuperset(group):
continue
s = merged([inputs[g] for g in group])
if s is not None and s != FALSEY:
validators = [make_validator(s) for s in schemas]
strats.append(
from_schema(s).filter(
lambda obj: all(v.is_valid(obj) for v in validators)
)
)
combined.update(group)
return st.one_of(strats)
示例18
def _numeric_with_multiplier(
min_value: Optional[float], max_value: Optional[float], schema: Schema
) -> st.SearchStrategy[float]:
"""Handle numeric schemata containing the multipleOf key."""
multiple_of = schema["multipleOf"]
assert isinstance(multiple_of, (int, float))
if min_value is not None:
min_value = math.ceil(Fraction(min_value) / Fraction(multiple_of))
if max_value is not None:
max_value = math.floor(Fraction(max_value) / Fraction(multiple_of))
if min_value is not None and max_value is not None and min_value > max_value:
# You would think that this is impossible, but it can happen if multipleOf
# is very small and the bounds are very close togther. It would be nicer
# to deal with this when canonicalising, but suffice to say we can't without
# diverging from the floating-point behaviour of the upstream validator.
return st.nothing()
return (
st.integers(min_value, max_value)
.map(lambda x: x * multiple_of)
.filter(make_validator(schema).is_valid)
)
示例19
def test_strategy(bits):
type_str = f"uint{bits}"
assert isinstance(strategy(type_str), SearchStrategy)
type_str = f"int{bits}"
assert isinstance(strategy(type_str), SearchStrategy)
示例20
def test_strategy():
assert isinstance(strategy("byte"), SearchStrategy)
assert isinstance(strategy("bytes"), SearchStrategy)
assert isinstance(strategy("bytes32"), SearchStrategy)
示例21
def test_strategy(base1, base2):
st = strategy(f"({base1},({base2},{base1}))")
assert isinstance(st, SearchStrategy)
示例22
def test_strategy(type_str):
assert isinstance(strategy(f"{type_str}[2]"), SearchStrategy)
assert isinstance(strategy(f"{type_str}[]"), SearchStrategy)
assert isinstance(strategy(f"{type_str}[][2]"), SearchStrategy)
assert isinstance(strategy(f"{type_str}[2][]"), SearchStrategy)
示例23
def test_strategy():
assert isinstance(strategy("bool"), SearchStrategy)
示例24
def test_strategy():
assert isinstance(strategy("string"), SearchStrategy)
示例25
def _integer_strategy(
type_str: str, min_value: Optional[int] = None, max_value: Optional[int] = None
) -> SearchStrategy:
min_value, max_value = _check_numeric_bounds(type_str, min_value, max_value, Wei)
return st.integers(min_value=min_value, max_value=max_value)
示例26
def _decimal_strategy(
min_value: NumberType = None, max_value: NumberType = None, places: int = 10
) -> SearchStrategy:
min_value, max_value = _check_numeric_bounds("int128", min_value, max_value, Fixed)
return st.decimals(min_value=min_value, max_value=max_value, places=places)
示例27
def _address_strategy(length: Optional[int] = None) -> SearchStrategy:
return _DeferredStrategyRepr(
lambda: st.sampled_from(list(network.accounts)[:length]), "accounts"
)
示例28
def _string_strategy(min_size: int = 0, max_size: int = 64) -> SearchStrategy:
return st.text(min_size=min_size, max_size=max_size)
示例29
def _tuple_strategy(abi_type: TupleType) -> SearchStrategy:
strategies = [strategy(i.to_type_str()) for i in abi_type.components]
return st.tuples(*strategies)
示例30
def contract_strategy(contract_name: str) -> SearchStrategy:
def _contract_deferred(name):
for proj in project.get_loaded_projects():
if name in proj.dict():
return st.sampled_from(list(proj[name]))
raise NameError(f"Contract '{name}' does not exist in any active projects")
return _DeferredStrategyRepr(lambda: _contract_deferred(contract_name), contract_name)