Python源码示例:jsonschema.validate()
示例1
def _test_validate(self, schema, expect_failure, input_files, input):
"""validates input yaml against schema.
:param schema: schema yaml file
:param expect_failure: should the validation pass or fail.
:param input_files: pytest fixture used to access the test input files
:param input: test input yaml doc filename"""
schema_dir = pkg_resources.resource_filename('drydock_provisioner',
'schemas')
schema_filename = os.path.join(schema_dir, schema)
schema_file = open(schema_filename, 'r')
schema = yaml.safe_load(schema_file)
input_file = input_files.join(input)
instance_file = open(str(input_file), 'r')
instance = yaml.safe_load(instance_file)
if expect_failure:
with pytest.raises(ValidationError):
jsonschema.validate(instance['spec'], schema['data'])
else:
jsonschema.validate(instance['spec'], schema['data'])
示例2
def validator_fail_with_warning(
self, record_mode, validator
) -> Iterable[Tuple[SqlValidator, Dict]]:
with vcr.use_cassette(
"tests/cassettes/test_sql_validator/fixture_validator_fail_with_warning.yaml",
match_on=["uri", "method", "raw_body"],
filter_headers=["Authorization"],
record_mode=record_mode,
):
# Move to dev mode to test conditional logic warning
validator.client.update_workspace("eye_exam", "dev")
validator.client.checkout_branch("eye_exam", "pytest")
validator.build_project(selectors=["eye_exam/users__fail_and_warn"])
results = validator.validate(mode="hybrid")
yield validator, results
示例3
def validator_warn(
self, record_mode, validator
) -> Iterable[Tuple[SqlValidator, Dict]]:
with vcr.use_cassette(
"tests/cassettes/test_sql_validator/fixture_validator_pass_with_warning.yaml",
match_on=["uri", "method", "raw_body"],
filter_headers=["Authorization"],
record_mode=record_mode,
):
# Move to dev mode to test conditional logic warning
validator.client.update_workspace("eye_exam", "dev")
validator.client.checkout_branch("eye_exam", "pytest")
validator.build_project(selectors=["eye_exam/users__warn"])
results = validator.validate(mode="hybrid")
yield validator, results
示例4
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
示例5
def load_repos(self, repo_config_file, tag):
try:
stream = file(repo_config_file, 'r')
repo_config = yaml.safe_load(stream)
except (yaml.YAMLError, IOError) as err:
raise InvalidConfigurationError(str(err))
try:
jsonschema.validate(repo_config,
yaml.safe_load(repo_definition_schema))
except jsonschema.exceptions.ValidationError as err:
raise InvalidConfigurationError(err.message)
for repo in repo_config['repos']:
if repo['name'] in self._repos.keys():
raise InvalidConfigurationError('Duplicate repository: {0}'.format(
repo['name']))
repo['tag'] = tag
repo['store'] = ObjectStore(Config().resolve_path(repo['path']),
repo['name'])
self._repos[repo['name']] = repo
示例6
def _read_meta(self, meta_path, name, revision):
if not os.path.isfile(meta_path):
raise ObjectNotFound(name, self._name, revision)
try:
with open(meta_path, 'r') as f:
meta = yaml.safe_load(f)
except (OSError, IOError) as e:
raise PcoccError('Unable to get metadata for {0}: {1}'.format(
name, e))
except yaml.YAMLError as e:
raise PcoccError('Bad metadata for {0}: {1}'.format(
name, e))
try:
jsonschema.validate(meta,
yaml.safe_load(metadata_schema))
except jsonschema.exceptions.ValidationError as e:
raise PcoccError('Bad metadata for {0}: {1}'.format(
name, e))
return meta
示例7
def _validate_repo_config(self):
try:
with open(self._config_path) as f:
repo_config = yaml.safe_load(f)
jsonschema.validate(repo_config,
yaml.safe_load(repo_config_schema))
except (yaml.YAMLError,
IOError,
jsonschema.exceptions.ValidationError) as err:
raise PcoccError(
'Bad repository config file {0} : {1}'.format(self._config_path,
err))
if repo_config['version'] != 1:
raise InvalidConfigurationError(
'unsupported repository {0} version'.format(self.name))
示例8
def password_is_complex(password, min_len=12):
"""
Check that the specified string meets standard password complexity
requirements.
:param str password: The password to validate.
:param int min_len: The mininum length the password should be.
:return: Whether the strings appears to be complex or not.
:rtype: bool
"""
has_upper = False
has_lower = False
has_digit = False
if len(password) < min_len:
return False
for char in password:
if char.isupper():
has_upper = True
if char.islower():
has_lower = True
if char.isdigit():
has_digit = True
if has_upper and has_lower and has_digit:
return True
return False
示例9
def validate_json_schema(data, schema_file_id):
"""
Validate the specified data against the specified schema. The schema file
will be searched for and loaded based on it's id. If the validation fails
a :py:class:`~jsonschema.exceptions.ValidationError` will be raised.
:param data: The data to validate against the schema.
:param schema_file_id: The id of the schema to load.
"""
schema_file_name = schema_file_id + '.json'
file_path = find.data_file(os.path.join('schemas', 'json', schema_file_name))
if file_path is None:
raise FileNotFoundError('the json schema file was not found')
with open(file_path, 'r') as file_h:
schema = json.load(file_h)
jsonschema.validate(data, schema)
示例10
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
with open(filename, 'rb') as json_file:
return json.loads(json_file.read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
示例11
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
示例12
def _validate_config(config: Dict[str, Any]):
schema_path = PACKAGE.get_resource_filename( # type: ignore
None, "query_exporter/schemas/config.yaml"
)
with open(schema_path) as fd:
schema = yaml.safe_load(fd)
try:
jsonschema.validate(config, schema)
except jsonschema.ValidationError as e:
path = "/".join(str(item) for item in e.absolute_path)
raise ConfigError(f"Invalid config at {path}: {e.message}")
示例13
def test_scenarios(log, name, scenario):
expect = list(map(lambda item: tuple(item), scenario.pop('report')))
actual = log(validate(**scenario))
assert actual == expect
示例14
def test_scenarios_return_valid_reports(name, scenario, report_schema):
del scenario['report']
report = validate(**scenario)
jsonschema.validate(report, report_schema)
示例15
def validate_tags(self, data):
for key, value in data.items():
if value is None: # use NoneType to unset an item
continue
# split key into a prefix and name
if '/' in key:
prefix, name = key.split('/')
else:
prefix, name = None, key
# validate optional prefix
if prefix:
if len(prefix) > 253:
raise serializers.ValidationError(
"Tag key prefixes must 253 characters or less.")
for part in prefix.split('/'):
if not re.match(TAGVAL_MATCH, part):
raise serializers.ValidationError(
"Tag key prefixes must be DNS subdomains.")
# validate required name
if not re.match(TAGVAL_MATCH, name):
raise serializers.ValidationError(
"Tag keys must be alphanumeric or \"-_.\", and 1-63 characters.")
# validate value if it isn't empty
if value and not re.match(TAGVAL_MATCH, str(value)):
raise serializers.ValidationError(
"Tag values must be alphanumeric or \"-_.\", and 1-63 characters.")
return data
示例16
def validate_healthcheck(self, data):
for procType, healthcheck in data.items():
if healthcheck is None:
continue
for key, value in healthcheck.items():
if value is None:
continue
if key not in ['livenessProbe', 'readinessProbe']:
raise serializers.ValidationError(
"Healthcheck keys must be either livenessProbe or readinessProbe")
try:
jsonschema.validate(value, PROBE_SCHEMA)
except jsonschema.ValidationError as e:
raise serializers.ValidationError(
"could not validate {}: {}".format(value, e.message))
# http://kubernetes.io/docs/api-reference/v1/definitions/#_v1_probe
# liveness only supports successThreshold=1, no other value
# This is not in the schema since readiness supports other values
threshold = jmespath.search('livenessProbe.successThreshold', healthcheck)
if threshold is not None and threshold != 1:
raise serializers.ValidationError(
'livenessProbe successThreshold can only be 1'
)
return data
示例17
def validate_autoscale(self, data):
schema = {
"$schema": "http://json-schema.org/schema#",
"type": "object",
"properties": {
# minimum replicas autoscale will keep resource at based on load
"min": {"type": "integer"},
# maximum replicas autoscale will keep resource at based on load
"max": {"type": "integer"},
# how much CPU load there is to trigger scaling rules
"cpu_percent": {"type": "integer"},
},
"required": ["min", "max", "cpu_percent"],
}
for proc, autoscale in data.items():
if autoscale is None:
continue
try:
jsonschema.validate(autoscale, schema)
except jsonschema.ValidationError as e:
raise serializers.ValidationError(
"could not validate {}: {}".format(autoscale, e.message)
)
return data
示例18
def validate_schema(data, schema):
"""Wraps default implementation but accepting tuples as arrays too.
https://github.com/Julian/jsonschema/issues/148
"""
return raw_validate(data, schema, types={"array": (list, tuple)})
示例19
def post(self, request):
if not can_create_question(self.request):
return self.http_method_not_allowed(request)
try:
body = json.loads(request.body)
except ValueError:
return HttpResponse(status=400)
try:
jsonschema.validate(body, self.request_body_schema)
except jsonschema.ValidationError:
return HttpResponse(status=400)
question_text = body.get('question')
choices = body.get('choices')
question, created = self.get_or_create(question_text, choices)
resource = self.resource()
resource.obj = question
resource.request = request
response = resource.get(request)
if created:
response.status_code = 201
response['Location'] = resource.get_uri()
return response
示例20
def validate(cls, payload):
if cls not in Document.__schemas:
Document.__schemas[cls] = cls.get_schema(ordered=True)
schema = cls.get_schema(ordered=True)
jsonschema.validate(payload, schema)
return payload
示例21
def load(self, definition, schema_file, context):
"""Load cloud connect configuration from a `dict` and validate
it with schema and global settings will be rendered.
:param schema_file: Schema file location used to validate config.
:param definition: A dictionary contains raw configs.
:param context: variables to render template in global setting.
:return: A `Munch` object.
"""
try:
validate(definition, self._get_schema_from_file(schema_file))
except ValidationError:
raise ConfigException(
'Failed to validate interface with schema: {}'.format(
traceback.format_exc()))
try:
global_settings = self._load_global_setting(
definition.get('global_settings'), context
)
requests = [self._load_request(item) for item in definition['requests']]
return munchify({
'meta': munchify(definition['meta']),
'tokens': definition['tokens'],
'global_settings': global_settings,
'requests': requests,
})
except Exception as ex:
error = 'Unable to load configuration: %s' % str(ex)
_logger.exception(error)
raise ConfigException(error)
示例22
def load(self, definition, schema_file, context):
"""Load cloud connect configuration from a `dict` and validate
it with schema and global settings will be rendered.
:param schema_file: Schema file location used to validate config.
:param definition: A dictionary contains raw configs.
:param context: variables to render template in global setting.
:return: A `Munch` object.
"""
try:
validate(definition, self._get_schema_from_file(schema_file))
except ValidationError:
raise ConfigException(
'Failed to validate interface with schema: {}'.format(
traceback.format_exc()))
try:
global_settings = self._load_global_setting(
definition.get('global_settings'), context
)
requests = [self._load_request(item) for item in definition['requests']]
return munchify({
'meta': munchify(definition['meta']),
'tokens': definition['tokens'],
'global_settings': global_settings,
'requests': requests,
})
except Exception as ex:
error = 'Unable to load configuration: %s' % str(ex)
_logger.exception(error)
raise ConfigException(error)
示例23
def validator_pass(
self, record_mode, validator
) -> Iterable[Tuple[DataTestValidator, Dict]]:
with vcr.use_cassette(
"tests/cassettes/test_data_test_validator/fixture_validator_pass.yaml",
match_on=["uri", "method", "raw_body", "query"],
filter_headers=["Authorization"],
record_mode=record_mode,
):
validator.build_project(selectors=["eye_exam/users"])
results = validator.validate()
yield validator, results
示例24
def test_results_should_conform_to_schema(self, schema, validator_pass):
results = validator_pass[1]
jsonschema.validate(results, schema)
示例25
def validator_fail(
self, record_mode, validator
) -> Iterable[Tuple[DataTestValidator, Dict]]:
with vcr.use_cassette(
"tests/cassettes/test_data_test_validator/fixture_validator_fail.yaml",
match_on=["uri", "method", "raw_body", "query"],
filter_headers=["Authorization"],
record_mode=record_mode,
):
validator.build_project(selectors=["eye_exam/users__fail"])
results = validator.validate()
yield validator, results
示例26
def test_results_should_conform_to_schema(self, schema, validator_fail):
results = validator_fail[1]
jsonschema.validate(results, schema)
示例27
def test_no_data_tests_should_raise_error(validator):
with pytest.raises(SpectaclesException):
validator.build_project(exclusions=["*/*"])
validator.validate()
示例28
def test_results_should_conform_to_schema(self, schema, validator_pass):
results = validator_pass[1]
jsonschema.validate(results, schema)
示例29
def validator_fail(
self, record_mode, validator
) -> Iterable[Tuple[ContentValidator, Dict]]:
with vcr.use_cassette(
"tests/cassettes/test_content_validator/fixture_validator_fail.yaml",
match_on=["uri", "method", "raw_body"],
filter_headers=["Authorization"],
record_mode=record_mode,
):
validator.build_project(selectors=["eye_exam/users__fail"])
results = validator.validate()
yield validator, results
示例30
def test_results_should_conform_to_schema(self, schema, validator_fail):
results = validator_fail[1]
jsonschema.validate(results, schema)