Python源码示例:jsonschema.RefResolver()

示例1
def testJsonSchema(schema_file, test_file):
  schema = loadSchema(schema_file)
  data = loadJson(test_file)

  print 'Loaded test validation JSON value from %s:' % test_file

  dir = os.path.dirname(os.path.realpath(__file__))

  resolver = jsonschema.RefResolver(referrer=schema, base_uri='file://' + dir + '/')

  try:
    jsonschema.validate(data, schema, resolver=resolver)
  except jsonschema.exceptions.ValidationError as e:
    print e
    print 'FAILED VALIDATION for %s' % test_file
    pprint(data)
    return 1

  print 'Validated.'
  return 0 
示例2
def get_validator(filename, base_uri=''):
    """Load schema from JSON file;
    Check whether it's a valid schema;
    Return a Draft4Validator object.
    Optionally specify a base URI for relative path
    resolution of JSON pointers (This is especially useful
    for local resolution via base_uri of form file://{some_path}/)
    """

    schema = get_json_from_file(filename)
    try:
        # Check schema via class method call. Works, despite IDE complaining
        Draft4Validator.check_schema(schema)
        print("Schema %s is valid JSON" % filename)
    except SchemaError:
        raise
    if base_uri:
        resolver = RefResolver(base_uri=base_uri,
                               referrer=filename)
    else:
        resolver = None
    return Draft4Validator(schema=schema,
                           resolver=resolver) 
示例3
def is_valid_response(action, resp={}):
    assert action.name in actions.keys()

    schema_for = {
        "IndexState": "State",
        "Index": "IndexReport",
        "GetIndexReport": "IndexReport",
        "GetVulnerabilityReport": "VulnerabilityReport",
    }
    filename = os.path.join(os.path.dirname(os.path.abspath(__file__)), "clair-v4.openapi.json")

    with open(filename) as openapi_file:
        openapi = json.load(openapi_file)
        resolver = RefResolver(base_uri="", referrer=openapi)
        schema = openapi["components"]["schemas"][schema_for[action.name]]

        try:
            validate(resp, schema, resolver=resolver)
            return True
        except Exception:
            logger.exception("Security scanner response failed OpenAPI validation")
            return False 
示例4
def _get_schema(file_type):
    '''Get a schema that can validate BSE JSON files or dictionaries

       The schema_type represents the type of BSE JSON file to be validated,
       and can be 'component', 'element', 'table', 'metadata', or 'references'.

       Returns the schema and the reference resolver
    '''

    schema_file = "{}-schema.json".format(file_type)
    file_path = os.path.join(_default_schema_dir, schema_file)

    schema = fileio.read_schema(file_path)

    # Set up the resolver for links
    base_uri = 'file://{}/'.format(_default_schema_dir)
    resolver = jsonschema.RefResolver(base_uri=base_uri, referrer=schema)

    return schema, resolver 
示例5
def validate(spec, schema_name, version=None):
    schema = load_schema(schema_name, version=version)
    try:
        resolver = jsonschema.RefResolver(
            base_uri='file://{0:s}'.format(
                pkg_resources.resource_filename(__name__, 'schemas/')
            ),
            referrer=schema_name,
            store=SCHEMA_CACHE,
        )
        validator = jsonschema.Draft6Validator(
            schema, resolver=resolver, format_checker=None
        )
        return validator.validate(spec)
    except jsonschema.ValidationError as err:
        raise InvalidSpecification(err) 
示例6
def setup_class(cls):
        """Set the test up."""
        cls.schema = json.load(open(AGENT_CONFIGURATION_SCHEMA))
        cls.resolver = jsonschema.RefResolver(
            "file://{}/".format(Path(CONFIGURATION_SCHEMA_DIR).absolute()), cls.schema
        )
        cls.validator = Draft4Validator(cls.schema, resolver=cls.resolver)
        cls.cwd = os.getcwd()
        cls.t = tempfile.mkdtemp()
        # copy the 'dummy_aea' directory in the parent of the agent folder.
        shutil.copytree(Path(CUR_PATH, "data", "dummy_aea"), Path(cls.t, "dummy_aea"))
        cls.runner = CliRunner()
        os.chdir(Path(cls.t, "dummy_aea"))

        with mock.patch(
            "aea.cli.list.format_items", return_value=FORMAT_ITEMS_SAMPLE_OUTPUT
        ):
            cls.result = cls.runner.invoke(
                cli, [*CLI_LOG_OPTION, "list", "protocols"], standalone_mode=False
            ) 
示例7
def setup_class(cls):
        """Set the test up."""
        cls.schema = json.load(open(AGENT_CONFIGURATION_SCHEMA))
        cls.resolver = jsonschema.RefResolver(
            "file://{}/".format(Path(CONFIGURATION_SCHEMA_DIR).absolute()), cls.schema
        )
        cls.validator = Draft4Validator(cls.schema, resolver=cls.resolver)
        cls.cwd = os.getcwd()
        cls.t = tempfile.mkdtemp()
        # copy the 'dummy_aea' directory in the parent of the agent folder.
        shutil.copytree(Path(CUR_PATH, "data", "dummy_aea"), Path(cls.t, "dummy_aea"))
        cls.runner = CliRunner()
        os.chdir(Path(cls.t, "dummy_aea"))

        with mock.patch(
            "aea.cli.list.format_items", return_value=FORMAT_ITEMS_SAMPLE_OUTPUT
        ):
            cls.result = cls.runner.invoke(
                cli, [*CLI_LOG_OPTION, "list", "skills"], standalone_mode=False
            ) 
示例8
def setup_class(cls):
        """Set the test up."""
        cls.schema = json.load(open(AGENT_CONFIGURATION_SCHEMA))
        cls.resolver = jsonschema.RefResolver(
            make_jsonschema_base_uri(Path(CONFIGURATION_SCHEMA_DIR).absolute()),
            cls.schema,
        )
        cls.validator = Draft4Validator(cls.schema, resolver=cls.resolver)

        cls.runner = CliRunner()
        cls.agent_name = "myagent"
        cls.cwd = os.getcwd()
        cls.t = tempfile.mkdtemp()
        os.chdir(cls.t)
        result = cls.runner.invoke(
            cli, [*CLI_LOG_OPTION, "init", "--local", "--author", AUTHOR]
        )
        assert result.exit_code == 0

        cls.result = cls.runner.invoke(
            cli,
            [*CLI_LOG_OPTION, "create", "--local", cls.agent_name],
            standalone_mode=False,
        )
        cls.agent_config = cls._load_config_file(cls.agent_name) 
示例9
def setup_class(cls):
        """Set the test up."""
        cls.schema = json.load(open(AGENT_CONFIGURATION_SCHEMA))
        cls.resolver = jsonschema.RefResolver(
            make_jsonschema_base_uri(Path(CONFIGURATION_SCHEMA_DIR)), cls.schema
        )
        cls.validator = Draft4Validator(cls.schema, resolver=cls.resolver)

        cls.cwd = os.getcwd()
        cls.t = tempfile.mkdtemp()
        # copy the 'dummy_aea' directory in the parent of the agent folder.
        shutil.copytree(Path(CUR_PATH, "data", "dummy_aea"), Path(cls.t, "dummy_aea"))
        cls.runner = CliRunner()
        os.chdir(Path(cls.t, "dummy_aea"))
        cls.result = cls.runner.invoke(
            cli, [*CLI_LOG_OPTION, "freeze"], standalone_mode=False
        ) 
示例10
def __init__(self, versions, path, skip_unknown=False, min_date='2016.09.01', future_hours=24):
        """
        Класс для валидации документов от ККТ по json-схеме.
        :param versions: поддерживаемые версии протокола, например ['1.0', '1.05'].
        :param path: путь до директории, которая содержит все директории со схемами, разбитым по версиям,
        например, схемы для протокола 1.0 должны лежать в <path>/1.0/
        :param skip_unknown: если номер версии отличается от поддерживаемых пропускать валидацию
        """
        self._validators = {}
        self._skip_unknown = skip_unknown
        schema_dir = os.path.expanduser(path)
        schema_dir = os.path.abspath(schema_dir)

        self.min_date = datetime.datetime.strptime(min_date, '%Y.%m.%d') if min_date else None
        self.future_hours = future_hours

        for version in versions:
            full_path = os.path.join(schema_dir, version, 'document.schema.json')
            with open(full_path, encoding='utf-8') as fh:
                schema = json.loads(fh.read())
                resolver = jsonschema.RefResolver('file://' + full_path, None)
                validator = Draft4Validator(schema=schema, resolver=resolver)
                validator.check_schema(schema)  # проверяем, что сама схема - валидная
                self._validators[version] = validator 
示例11
def _create_validator(schema: Dict) -> Draft4Validator:
        """resolve $ref links in a loaded json schema and return a validator

        Parameters
        ----------
        schema : Dict
            loaded json schema

        Returns
        -------
        Draft4Validator :
            json-schema validator specific to the supplied schema, with references resolved

        """

        # Note: we are using 5.0.0 here as the first known file. It does *not* need to
        # be upgraded with each version bump since only the dirname is used.
        experiment_schema_path = Path(resource_filename(
            package_name, "spacetx_format/schema/experiment_5.0.0.json"))

        package_root_path = experiment_schema_path.parent.parent
        base_uri = f"{package_root_path.as_uri()}/"
        resolver = RefResolver(base_uri, schema)
        return Draft4Validator(schema, resolver=resolver) 
示例12
def validate_JSON(data,combined_schema):
    for keys in combined_schema['properties']:
        if "$ref" in (combined_schema['properties'])[keys]:
            refUrl= ((combined_schema['properties'])[keys])['$ref']
            references_file, section = refUrl.split('#')

    if not os.path.isfile(references_file):
        print "References file does not exists"
        sys.exit()

    schema_dir = os.path.dirname(os.path.realpath(references_file))
    resolver = RefResolver("file://{}/".format(schema_dir), None)

    try:
        validate(data,combined_schema,format_checker=FormatChecker(), resolver=resolver)
        print "JSON is valid with the schema"
        return True
    except exceptions.ValidationError as error:
        print(error.message)
        return False

#Merging schemas and putting key translations 
示例13
def make_validator(schema, base_uri=None):
    if not base_uri:
        base_uri = Draft7Validator.ID_OF(schema)

    def get_from_local(uri):  # pylint: disable=unused-argument
        meta_schema = Path(os.path.dirname(os.path.realpath(__file__))).joinpath(
            "data/schema/meta-schema.json"
        )
        return json.load(meta_schema.open())

    resolver = RefResolver(
        base_uri=base_uri,
        referrer=schema,
        handlers={"http": get_from_local, "https": get_from_local},
    )
    return Draft7Validator(schema, resolver=resolver) 
示例14
def test_non_default_resolver_validator(markdown_examples):
    ms = URIDict()
    draft3 = load_schema("draft3")
    draft4 = load_schema("draft4")
    ms[draft3["id"]] = draft3
    ms[draft4["id"]] = draft4
    resolver_with_store = RefResolver(draft3["id"], draft3, ms)

    # 'Other' schema should be valid with draft3
    builder = pjo.ObjectBuilder(
        markdown_examples["Other"],
        resolver=resolver_with_store,
        validatorClass=Draft3Validator,
        resolved=markdown_examples,
    )
    klasses = builder.build_classes()
    a = klasses.Other(MyAddress="where I live")
    assert a.MyAddress == "where I live" 
示例15
def resolve_schema_references(schema, refs=None):
    '''Resolves and replaces json-schema $refs with the appropriate dict.

    Recursively walks the given schema dict, converting every instance
    of $ref in a 'properties' structure with a resolved dict.

    This modifies the input schema and also returns it.

    Arguments:
        schema:
            the schema dict
        refs:
            a dict of <string, dict> which forms a store of referenced schemata

    Returns:
        schema
    '''
    refs = refs or {}
    return _resolve_schema_references(schema, RefResolver("", schema, store=refs)) 
示例16
def validate(self, data=None):
        if not data:
            data = self.config_dict
        schema_dir = "/usr/share/contrailctl/schema/"
        schema_path="{}/{}.json".format(schema_dir, self.component)
        resolver = RefResolver("file://{}/".format(schema_dir), None)
        try:
            schema=open(schema_path,'r').read()
        except IOError as error:
            print("Schema file is missing - {}".format(schema_path))
            return True
        try:
            validate(data, json.loads(schema), format_checker=FormatChecker(), resolver=resolver)
            return True
        except exceptions.ValidationError as error:
            print(error.message)
            return False 
示例17
def in_scopes(resolver: jsonschema.RefResolver, scopes: List[str]) -> Generator[None, None, None]:
    """Push all available scopes into the resolver.

    There could be an additional scope change during schema resolving in `get_response_schema`, so in total there
    could be a stack of two scopes maximum. This context manager handles both cases (1 or 2 scope changes) in the same
    way.
    """
    with ExitStack() as stack:
        for scope in scopes:
            stack.enter_context(resolver.in_scope(scope))
        yield 
示例18
def process_definitions(self, schema: Dict[str, Any], endpoint: Endpoint, resolver: RefResolver) -> None:
        """Add relevant security parameters to data generation."""
        definitions = self.get_security_definitions(schema, resolver)
        requirements = get_security_requirements(schema, endpoint)
        for name, definition in definitions.items():
            if name in requirements:
                if definition["type"] == "apiKey":
                    self.process_api_key_security_definition(definition, endpoint)
                self.process_http_security_definition(definition, endpoint) 
示例19
def get_security_definitions(self, schema: Dict[str, Any], resolver: RefResolver) -> Dict[str, Any]:
        return schema.get("securityDefinitions", {}) 
示例20
def get_security_definitions(self, schema: Dict[str, Any], resolver: RefResolver) -> Dict[str, Any]:
        """In Open API 3 security definitions are located in ``components`` and may have references inside."""
        components = schema.get("components", {})
        security_schemes = components.get("securitySchemes", {})
        if "$ref" in security_schemes:
            return resolver.resolve(security_schemes["$ref"])[1]
        return security_schemes 
示例21
def validate_json(self, stac_content, stac_schema):
        """
        Validate STAC.
        :param stac_content: input STAC file content
        :param stac_schema of STAC (item, catalog, collection)
        :return: validation message
        """

        try:
            if "title" in stac_schema and "item" in stac_schema["title"].lower():
                logger.debug("Changing GeoJson definition to reference local file")
                # rewrite relative reference to use local geojson file
                stac_schema["definitions"]["core"]["allOf"][0]["oneOf"][0]["$ref"] = (
                    "file://" + self.dirpath + "/geojson.json#definitions/feature"
                )
            logging.info("Validating STAC")
            validate(stac_content, stac_schema)
            return True, None
        except RefResolutionError as error:
            # See https://github.com/Julian/jsonschema/issues/362
            # See https://github.com/Julian/jsonschema/issues/313
            # See https://github.com/Julian/jsonschema/issues/98
            try:
                self.fetch_spec("geojson")
                self.geojson_resolver = RefResolver(
                    base_uri=f"file://{self.dirpath}/geojson.json", referrer="geojson.json"
                )
                validate(stac_content, stac_schema, resolver=self.geojson_resolver)
                return True, None
            except Exception as error:
                logger.exception("A reference resolution error")
                return False, f"{error.args}"
        except ValidationError as error:
            logger.warning("STAC Validation Error")
            return False, f"{error.message} of {list(error.path)}"
        except Exception as error:
            logger.exception("STAC error")
            return False, f"{error}" 
示例22
def do_create_(jsfile, success):
    def do_expected(self):
        with open(jsfile) as f:
            js = json.load(f)
        try:
            assert '@type' in js
            schema_name = js['@type']
            with open(os.path.join(schema_folder, schema_name +
                                   ".json")) as file_object:
                schema = json.load(file_object)
            resolver = RefResolver('file://' + schema_folder + '/', schema)
            validator = Draft4Validator(schema, resolver=resolver)
            validator.validate(js)
        except (AssertionError, ValidationError, KeyError) as ex:
            if success:
                raise
            return
        assert success
    return do_expected 
示例23
def expand_with_schema(name, attrs):
        if 'schema' in attrs:  # Schema specified by name
            schema_file = '{}.json'.format(attrs['schema'])
        elif 'schema_file' in attrs:
            schema_file = attrs['schema_file']
            del attrs['schema_file']
        else:
            return attrs

        if '/' not in 'schema_file':
            thisdir = os.path.dirname(os.path.realpath(__file__))
            schema_file = os.path.join(thisdir,
                                       'schemas',
                                       schema_file)

        schema_path = 'file://' + schema_file

        with open(schema_file) as f:
            schema = json.load(f)

        resolver = jsonschema.RefResolver(schema_path, schema)
        if '@type' not in attrs:
            attrs['@type'] = name
        attrs['_schema_file'] = schema_file
        attrs['schema'] = schema
        attrs['_validator'] = jsonschema.Draft4Validator(schema, resolver=resolver)

        schema_defaults = BaseMeta.get_defaults(attrs['schema'])
        attrs.update(schema_defaults)

        return attrs 
示例24
def get_schema_validator(self, schema_name):
        """
        Had to remove the id property from map.json or it uses URLs for validation
        See various issues at https://github.com/Julian/jsonschema/pull/306
        """

        if schema_name not in self.schemas:
            schema_file = self.get_schema_file(schema_name)
            with open(schema_file) as f:
                try:
                    jsn_schema = json.load(f)
                except ValueError as ex:
                    log.error("Could not load %s", schema_file)
                    raise ex

            schemas_folder = self.get_schemas_folder()
            root_schema_path = self.get_schema_path(schemas_folder)
            resolver = jsonschema.RefResolver(root_schema_path, None)
            # cache the schema for future use
            self.schemas[schema_name] = (jsn_schema, resolver)
        else:
            jsn_schema, resolver = self.schemas[schema_name]

        validator = jsonschema.Draft4Validator(schema=jsn_schema, resolver=resolver)
        # validator.check_schema(jsn_schema) # check schema is valid

        return validator 
示例25
def validate_against_config_schema(config_file):
    schema = load_jsonschema(config_file)
    format_checker = FormatChecker(["ports", "expose", "subnet_ip_address"])
    validator = Draft4Validator(
        schema,
        resolver=RefResolver(get_resolver_path(), schema),
        format_checker=format_checker)
    handle_errors(
        validator.iter_errors(config_file.config),
        process_config_schema_errors,
        config_file.filename) 
示例26
def test_openapi_spec_validity(flask_app_client):
    raw_openapi_spec = flask_app_client.get('/api/v1/swagger.json').data
    deserialized_openapi_spec = json.loads(raw_openapi_spec.decode('utf-8'))
    assert isinstance(validator20.validate_spec(deserialized_openapi_spec), RefResolver) 
示例27
def validate_v0(json_to_validate, schema_filename):
    schema_dir = os.path.join(os.path.dirname(__file__), 'schemas/v0')
    resolver = jsonschema.RefResolver('file://' + schema_dir + '/', None)
    with open(os.path.join(schema_dir, schema_filename)) as schema:
        jsonschema.validate(
            json_to_validate,
            json.load(schema),
            format_checker=jsonschema.FormatChecker(),
            resolver=resolver
        ) 
示例28
def assertContainsRequiredFields(schema_filename, response):
  schema_dir = os.path.abspath(os.path.join(os.path.dirname(
      inspect.getfile(inspect.currentframe())), '..', '..', 'schema'))
  schema_filename = os.path.join(schema_dir, schema_filename)
  schema = json.load(open(schema_filename))
  Draft4Validator.check_schema(schema)
  if os.name == 'nt':
      os_base_uri = 'file:///'
  else:
      os_base_uri = 'file://'
  resolver = RefResolver(referrer=schema, base_uri=os_base_uri + schema_dir + '/')
  # Raises ValidationError when incorrect response
  validate(response, schema, resolver=resolver) 
示例29
def load_validator(schema_path, schema):
    """Create a JSON schema validator for the given schema.

    Args:
        schema_path: The filename of the JSON schema.
        schema: A Python object representation of the same schema.

    Returns:
        An instance of Draft7Validator.

    """
    global SCHEMA_STORE

    # Get correct prefix based on OS
    if os.name == 'nt':
        file_prefix = 'file:///'
    else:
        file_prefix = 'file:'

    resolver = RefResolver(file_prefix + schema_path.replace("\\", "/"), schema, store=SCHEMA_STORE)
    schema_id = schema.get('$id', '')
    if schema_id:
        resolver.store[schema_id] = schema
    # RefResolver creates a new store internally; persist it so we can use the same mappings every time
    SCHEMA_STORE = resolver.store
    validator = STIXValidator(schema, resolver=resolver, format_checker=draft7_format_checker)
    return validator 
示例30
def load_schema(schema_path):
    """Prepare the api specification for request and response validation.

    :returns: a mapping from :class:`RequestMatcher` to :class:`ValidatorMap`
        for every operation in the api specification.
    :rtype: dict
    """
    with open(schema_path, 'r') as schema_file:
        schema = simplejson.load(schema_file)
    resolver = RefResolver('', '', schema.get('models', {}))
    return build_request_to_validator_map(schema, resolver)