Python源码示例:jsonschema.RefResolver()
示例1
def testJsonSchema(schema_file, test_file):
schema = loadSchema(schema_file)
data = loadJson(test_file)
print 'Loaded test validation JSON value from %s:' % test_file
dir = os.path.dirname(os.path.realpath(__file__))
resolver = jsonschema.RefResolver(referrer=schema, base_uri='file://' + dir + '/')
try:
jsonschema.validate(data, schema, resolver=resolver)
except jsonschema.exceptions.ValidationError as e:
print e
print 'FAILED VALIDATION for %s' % test_file
pprint(data)
return 1
print 'Validated.'
return 0
示例2
def get_validator(filename, base_uri=''):
"""Load schema from JSON file;
Check whether it's a valid schema;
Return a Draft4Validator object.
Optionally specify a base URI for relative path
resolution of JSON pointers (This is especially useful
for local resolution via base_uri of form file://{some_path}/)
"""
schema = get_json_from_file(filename)
try:
# Check schema via class method call. Works, despite IDE complaining
Draft4Validator.check_schema(schema)
print("Schema %s is valid JSON" % filename)
except SchemaError:
raise
if base_uri:
resolver = RefResolver(base_uri=base_uri,
referrer=filename)
else:
resolver = None
return Draft4Validator(schema=schema,
resolver=resolver)
示例3
def is_valid_response(action, resp={}):
assert action.name in actions.keys()
schema_for = {
"IndexState": "State",
"Index": "IndexReport",
"GetIndexReport": "IndexReport",
"GetVulnerabilityReport": "VulnerabilityReport",
}
filename = os.path.join(os.path.dirname(os.path.abspath(__file__)), "clair-v4.openapi.json")
with open(filename) as openapi_file:
openapi = json.load(openapi_file)
resolver = RefResolver(base_uri="", referrer=openapi)
schema = openapi["components"]["schemas"][schema_for[action.name]]
try:
validate(resp, schema, resolver=resolver)
return True
except Exception:
logger.exception("Security scanner response failed OpenAPI validation")
return False
示例4
def _get_schema(file_type):
'''Get a schema that can validate BSE JSON files or dictionaries
The schema_type represents the type of BSE JSON file to be validated,
and can be 'component', 'element', 'table', 'metadata', or 'references'.
Returns the schema and the reference resolver
'''
schema_file = "{}-schema.json".format(file_type)
file_path = os.path.join(_default_schema_dir, schema_file)
schema = fileio.read_schema(file_path)
# Set up the resolver for links
base_uri = 'file://{}/'.format(_default_schema_dir)
resolver = jsonschema.RefResolver(base_uri=base_uri, referrer=schema)
return schema, resolver
示例5
def validate(spec, schema_name, version=None):
schema = load_schema(schema_name, version=version)
try:
resolver = jsonschema.RefResolver(
base_uri='file://{0:s}'.format(
pkg_resources.resource_filename(__name__, 'schemas/')
),
referrer=schema_name,
store=SCHEMA_CACHE,
)
validator = jsonschema.Draft6Validator(
schema, resolver=resolver, format_checker=None
)
return validator.validate(spec)
except jsonschema.ValidationError as err:
raise InvalidSpecification(err)
示例6
def setup_class(cls):
"""Set the test up."""
cls.schema = json.load(open(AGENT_CONFIGURATION_SCHEMA))
cls.resolver = jsonschema.RefResolver(
"file://{}/".format(Path(CONFIGURATION_SCHEMA_DIR).absolute()), cls.schema
)
cls.validator = Draft4Validator(cls.schema, resolver=cls.resolver)
cls.cwd = os.getcwd()
cls.t = tempfile.mkdtemp()
# copy the 'dummy_aea' directory in the parent of the agent folder.
shutil.copytree(Path(CUR_PATH, "data", "dummy_aea"), Path(cls.t, "dummy_aea"))
cls.runner = CliRunner()
os.chdir(Path(cls.t, "dummy_aea"))
with mock.patch(
"aea.cli.list.format_items", return_value=FORMAT_ITEMS_SAMPLE_OUTPUT
):
cls.result = cls.runner.invoke(
cli, [*CLI_LOG_OPTION, "list", "protocols"], standalone_mode=False
)
示例7
def setup_class(cls):
"""Set the test up."""
cls.schema = json.load(open(AGENT_CONFIGURATION_SCHEMA))
cls.resolver = jsonschema.RefResolver(
"file://{}/".format(Path(CONFIGURATION_SCHEMA_DIR).absolute()), cls.schema
)
cls.validator = Draft4Validator(cls.schema, resolver=cls.resolver)
cls.cwd = os.getcwd()
cls.t = tempfile.mkdtemp()
# copy the 'dummy_aea' directory in the parent of the agent folder.
shutil.copytree(Path(CUR_PATH, "data", "dummy_aea"), Path(cls.t, "dummy_aea"))
cls.runner = CliRunner()
os.chdir(Path(cls.t, "dummy_aea"))
with mock.patch(
"aea.cli.list.format_items", return_value=FORMAT_ITEMS_SAMPLE_OUTPUT
):
cls.result = cls.runner.invoke(
cli, [*CLI_LOG_OPTION, "list", "skills"], standalone_mode=False
)
示例8
def setup_class(cls):
"""Set the test up."""
cls.schema = json.load(open(AGENT_CONFIGURATION_SCHEMA))
cls.resolver = jsonschema.RefResolver(
make_jsonschema_base_uri(Path(CONFIGURATION_SCHEMA_DIR).absolute()),
cls.schema,
)
cls.validator = Draft4Validator(cls.schema, resolver=cls.resolver)
cls.runner = CliRunner()
cls.agent_name = "myagent"
cls.cwd = os.getcwd()
cls.t = tempfile.mkdtemp()
os.chdir(cls.t)
result = cls.runner.invoke(
cli, [*CLI_LOG_OPTION, "init", "--local", "--author", AUTHOR]
)
assert result.exit_code == 0
cls.result = cls.runner.invoke(
cli,
[*CLI_LOG_OPTION, "create", "--local", cls.agent_name],
standalone_mode=False,
)
cls.agent_config = cls._load_config_file(cls.agent_name)
示例9
def setup_class(cls):
"""Set the test up."""
cls.schema = json.load(open(AGENT_CONFIGURATION_SCHEMA))
cls.resolver = jsonschema.RefResolver(
make_jsonschema_base_uri(Path(CONFIGURATION_SCHEMA_DIR)), cls.schema
)
cls.validator = Draft4Validator(cls.schema, resolver=cls.resolver)
cls.cwd = os.getcwd()
cls.t = tempfile.mkdtemp()
# copy the 'dummy_aea' directory in the parent of the agent folder.
shutil.copytree(Path(CUR_PATH, "data", "dummy_aea"), Path(cls.t, "dummy_aea"))
cls.runner = CliRunner()
os.chdir(Path(cls.t, "dummy_aea"))
cls.result = cls.runner.invoke(
cli, [*CLI_LOG_OPTION, "freeze"], standalone_mode=False
)
示例10
def __init__(self, versions, path, skip_unknown=False, min_date='2016.09.01', future_hours=24):
"""
Класс для валидации документов от ККТ по json-схеме.
:param versions: поддерживаемые версии протокола, например ['1.0', '1.05'].
:param path: путь до директории, которая содержит все директории со схемами, разбитым по версиям,
например, схемы для протокола 1.0 должны лежать в <path>/1.0/
:param skip_unknown: если номер версии отличается от поддерживаемых пропускать валидацию
"""
self._validators = {}
self._skip_unknown = skip_unknown
schema_dir = os.path.expanduser(path)
schema_dir = os.path.abspath(schema_dir)
self.min_date = datetime.datetime.strptime(min_date, '%Y.%m.%d') if min_date else None
self.future_hours = future_hours
for version in versions:
full_path = os.path.join(schema_dir, version, 'document.schema.json')
with open(full_path, encoding='utf-8') as fh:
schema = json.loads(fh.read())
resolver = jsonschema.RefResolver('file://' + full_path, None)
validator = Draft4Validator(schema=schema, resolver=resolver)
validator.check_schema(schema) # проверяем, что сама схема - валидная
self._validators[version] = validator
示例11
def _create_validator(schema: Dict) -> Draft4Validator:
"""resolve $ref links in a loaded json schema and return a validator
Parameters
----------
schema : Dict
loaded json schema
Returns
-------
Draft4Validator :
json-schema validator specific to the supplied schema, with references resolved
"""
# Note: we are using 5.0.0 here as the first known file. It does *not* need to
# be upgraded with each version bump since only the dirname is used.
experiment_schema_path = Path(resource_filename(
package_name, "spacetx_format/schema/experiment_5.0.0.json"))
package_root_path = experiment_schema_path.parent.parent
base_uri = f"{package_root_path.as_uri()}/"
resolver = RefResolver(base_uri, schema)
return Draft4Validator(schema, resolver=resolver)
示例12
def validate_JSON(data,combined_schema):
for keys in combined_schema['properties']:
if "$ref" in (combined_schema['properties'])[keys]:
refUrl= ((combined_schema['properties'])[keys])['$ref']
references_file, section = refUrl.split('#')
if not os.path.isfile(references_file):
print "References file does not exists"
sys.exit()
schema_dir = os.path.dirname(os.path.realpath(references_file))
resolver = RefResolver("file://{}/".format(schema_dir), None)
try:
validate(data,combined_schema,format_checker=FormatChecker(), resolver=resolver)
print "JSON is valid with the schema"
return True
except exceptions.ValidationError as error:
print(error.message)
return False
#Merging schemas and putting key translations
示例13
def make_validator(schema, base_uri=None):
if not base_uri:
base_uri = Draft7Validator.ID_OF(schema)
def get_from_local(uri): # pylint: disable=unused-argument
meta_schema = Path(os.path.dirname(os.path.realpath(__file__))).joinpath(
"data/schema/meta-schema.json"
)
return json.load(meta_schema.open())
resolver = RefResolver(
base_uri=base_uri,
referrer=schema,
handlers={"http": get_from_local, "https": get_from_local},
)
return Draft7Validator(schema, resolver=resolver)
示例14
def test_non_default_resolver_validator(markdown_examples):
ms = URIDict()
draft3 = load_schema("draft3")
draft4 = load_schema("draft4")
ms[draft3["id"]] = draft3
ms[draft4["id"]] = draft4
resolver_with_store = RefResolver(draft3["id"], draft3, ms)
# 'Other' schema should be valid with draft3
builder = pjo.ObjectBuilder(
markdown_examples["Other"],
resolver=resolver_with_store,
validatorClass=Draft3Validator,
resolved=markdown_examples,
)
klasses = builder.build_classes()
a = klasses.Other(MyAddress="where I live")
assert a.MyAddress == "where I live"
示例15
def resolve_schema_references(schema, refs=None):
'''Resolves and replaces json-schema $refs with the appropriate dict.
Recursively walks the given schema dict, converting every instance
of $ref in a 'properties' structure with a resolved dict.
This modifies the input schema and also returns it.
Arguments:
schema:
the schema dict
refs:
a dict of <string, dict> which forms a store of referenced schemata
Returns:
schema
'''
refs = refs or {}
return _resolve_schema_references(schema, RefResolver("", schema, store=refs))
示例16
def validate(self, data=None):
if not data:
data = self.config_dict
schema_dir = "/usr/share/contrailctl/schema/"
schema_path="{}/{}.json".format(schema_dir, self.component)
resolver = RefResolver("file://{}/".format(schema_dir), None)
try:
schema=open(schema_path,'r').read()
except IOError as error:
print("Schema file is missing - {}".format(schema_path))
return True
try:
validate(data, json.loads(schema), format_checker=FormatChecker(), resolver=resolver)
return True
except exceptions.ValidationError as error:
print(error.message)
return False
示例17
def in_scopes(resolver: jsonschema.RefResolver, scopes: List[str]) -> Generator[None, None, None]:
"""Push all available scopes into the resolver.
There could be an additional scope change during schema resolving in `get_response_schema`, so in total there
could be a stack of two scopes maximum. This context manager handles both cases (1 or 2 scope changes) in the same
way.
"""
with ExitStack() as stack:
for scope in scopes:
stack.enter_context(resolver.in_scope(scope))
yield
示例18
def process_definitions(self, schema: Dict[str, Any], endpoint: Endpoint, resolver: RefResolver) -> None:
"""Add relevant security parameters to data generation."""
definitions = self.get_security_definitions(schema, resolver)
requirements = get_security_requirements(schema, endpoint)
for name, definition in definitions.items():
if name in requirements:
if definition["type"] == "apiKey":
self.process_api_key_security_definition(definition, endpoint)
self.process_http_security_definition(definition, endpoint)
示例19
def get_security_definitions(self, schema: Dict[str, Any], resolver: RefResolver) -> Dict[str, Any]:
return schema.get("securityDefinitions", {})
示例20
def get_security_definitions(self, schema: Dict[str, Any], resolver: RefResolver) -> Dict[str, Any]:
"""In Open API 3 security definitions are located in ``components`` and may have references inside."""
components = schema.get("components", {})
security_schemes = components.get("securitySchemes", {})
if "$ref" in security_schemes:
return resolver.resolve(security_schemes["$ref"])[1]
return security_schemes
示例21
def validate_json(self, stac_content, stac_schema):
"""
Validate STAC.
:param stac_content: input STAC file content
:param stac_schema of STAC (item, catalog, collection)
:return: validation message
"""
try:
if "title" in stac_schema and "item" in stac_schema["title"].lower():
logger.debug("Changing GeoJson definition to reference local file")
# rewrite relative reference to use local geojson file
stac_schema["definitions"]["core"]["allOf"][0]["oneOf"][0]["$ref"] = (
"file://" + self.dirpath + "/geojson.json#definitions/feature"
)
logging.info("Validating STAC")
validate(stac_content, stac_schema)
return True, None
except RefResolutionError as error:
# See https://github.com/Julian/jsonschema/issues/362
# See https://github.com/Julian/jsonschema/issues/313
# See https://github.com/Julian/jsonschema/issues/98
try:
self.fetch_spec("geojson")
self.geojson_resolver = RefResolver(
base_uri=f"file://{self.dirpath}/geojson.json", referrer="geojson.json"
)
validate(stac_content, stac_schema, resolver=self.geojson_resolver)
return True, None
except Exception as error:
logger.exception("A reference resolution error")
return False, f"{error.args}"
except ValidationError as error:
logger.warning("STAC Validation Error")
return False, f"{error.message} of {list(error.path)}"
except Exception as error:
logger.exception("STAC error")
return False, f"{error}"
示例22
def do_create_(jsfile, success):
def do_expected(self):
with open(jsfile) as f:
js = json.load(f)
try:
assert '@type' in js
schema_name = js['@type']
with open(os.path.join(schema_folder, schema_name +
".json")) as file_object:
schema = json.load(file_object)
resolver = RefResolver('file://' + schema_folder + '/', schema)
validator = Draft4Validator(schema, resolver=resolver)
validator.validate(js)
except (AssertionError, ValidationError, KeyError) as ex:
if success:
raise
return
assert success
return do_expected
示例23
def expand_with_schema(name, attrs):
if 'schema' in attrs: # Schema specified by name
schema_file = '{}.json'.format(attrs['schema'])
elif 'schema_file' in attrs:
schema_file = attrs['schema_file']
del attrs['schema_file']
else:
return attrs
if '/' not in 'schema_file':
thisdir = os.path.dirname(os.path.realpath(__file__))
schema_file = os.path.join(thisdir,
'schemas',
schema_file)
schema_path = 'file://' + schema_file
with open(schema_file) as f:
schema = json.load(f)
resolver = jsonschema.RefResolver(schema_path, schema)
if '@type' not in attrs:
attrs['@type'] = name
attrs['_schema_file'] = schema_file
attrs['schema'] = schema
attrs['_validator'] = jsonschema.Draft4Validator(schema, resolver=resolver)
schema_defaults = BaseMeta.get_defaults(attrs['schema'])
attrs.update(schema_defaults)
return attrs
示例24
def get_schema_validator(self, schema_name):
"""
Had to remove the id property from map.json or it uses URLs for validation
See various issues at https://github.com/Julian/jsonschema/pull/306
"""
if schema_name not in self.schemas:
schema_file = self.get_schema_file(schema_name)
with open(schema_file) as f:
try:
jsn_schema = json.load(f)
except ValueError as ex:
log.error("Could not load %s", schema_file)
raise ex
schemas_folder = self.get_schemas_folder()
root_schema_path = self.get_schema_path(schemas_folder)
resolver = jsonschema.RefResolver(root_schema_path, None)
# cache the schema for future use
self.schemas[schema_name] = (jsn_schema, resolver)
else:
jsn_schema, resolver = self.schemas[schema_name]
validator = jsonschema.Draft4Validator(schema=jsn_schema, resolver=resolver)
# validator.check_schema(jsn_schema) # check schema is valid
return validator
示例25
def validate_against_config_schema(config_file):
schema = load_jsonschema(config_file)
format_checker = FormatChecker(["ports", "expose", "subnet_ip_address"])
validator = Draft4Validator(
schema,
resolver=RefResolver(get_resolver_path(), schema),
format_checker=format_checker)
handle_errors(
validator.iter_errors(config_file.config),
process_config_schema_errors,
config_file.filename)
示例26
def test_openapi_spec_validity(flask_app_client):
raw_openapi_spec = flask_app_client.get('/api/v1/swagger.json').data
deserialized_openapi_spec = json.loads(raw_openapi_spec.decode('utf-8'))
assert isinstance(validator20.validate_spec(deserialized_openapi_spec), RefResolver)
示例27
def validate_v0(json_to_validate, schema_filename):
schema_dir = os.path.join(os.path.dirname(__file__), 'schemas/v0')
resolver = jsonschema.RefResolver('file://' + schema_dir + '/', None)
with open(os.path.join(schema_dir, schema_filename)) as schema:
jsonschema.validate(
json_to_validate,
json.load(schema),
format_checker=jsonschema.FormatChecker(),
resolver=resolver
)
示例28
def assertContainsRequiredFields(schema_filename, response):
schema_dir = os.path.abspath(os.path.join(os.path.dirname(
inspect.getfile(inspect.currentframe())), '..', '..', 'schema'))
schema_filename = os.path.join(schema_dir, schema_filename)
schema = json.load(open(schema_filename))
Draft4Validator.check_schema(schema)
if os.name == 'nt':
os_base_uri = 'file:///'
else:
os_base_uri = 'file://'
resolver = RefResolver(referrer=schema, base_uri=os_base_uri + schema_dir + '/')
# Raises ValidationError when incorrect response
validate(response, schema, resolver=resolver)
示例29
def load_validator(schema_path, schema):
"""Create a JSON schema validator for the given schema.
Args:
schema_path: The filename of the JSON schema.
schema: A Python object representation of the same schema.
Returns:
An instance of Draft7Validator.
"""
global SCHEMA_STORE
# Get correct prefix based on OS
if os.name == 'nt':
file_prefix = 'file:///'
else:
file_prefix = 'file:'
resolver = RefResolver(file_prefix + schema_path.replace("\\", "/"), schema, store=SCHEMA_STORE)
schema_id = schema.get('$id', '')
if schema_id:
resolver.store[schema_id] = schema
# RefResolver creates a new store internally; persist it so we can use the same mappings every time
SCHEMA_STORE = resolver.store
validator = STIXValidator(schema, resolver=resolver, format_checker=draft7_format_checker)
return validator
示例30
def load_schema(schema_path):
"""Prepare the api specification for request and response validation.
:returns: a mapping from :class:`RequestMatcher` to :class:`ValidatorMap`
for every operation in the api specification.
:rtype: dict
"""
with open(schema_path, 'r') as schema_file:
schema = simplejson.load(schema_file)
resolver = RefResolver('', '', schema.get('models', {}))
return build_request_to_validator_map(schema, resolver)