Python源码示例:pathlib.PurePosixPath()

示例1
def _ftp_put(self, local_path, path, atomic):
        normpath = str(PurePosixPath(path))
        folder = str(PurePosixPath(path).parent)
        self._ftp_mkdirs(folder)

        # go back to ftp root folder
        self.conn.cwd("/")

        # random file name
        if atomic:
            tmp_path = folder + os.sep + 'luigi-tmp-%09d' % random.randrange(0, 1e10)
        else:
            tmp_path = normpath

        self.conn.storbinary('STOR %s' % tmp_path, open(local_path, 'rb'))

        if atomic:
            self.conn.rename(tmp_path, normpath) 
示例2
def should_domain_substitute(path, relative_path, search_regex, unused_patterns):
    """
    Returns True if a path should be domain substituted in the source tree; False otherwise

    path is the pathlib.Path to the file from the current working directory.
    relative_path is the pathlib.Path to the file from the source tree.
    search_regex is a compiled regex object to search for domain names
    unused_patterns is a UnusedPatterns object
    """
    relative_path_posix = relative_path.as_posix().lower()
    for include_pattern in DOMAIN_INCLUDE_PATTERNS:
        if PurePosixPath(relative_path_posix).match(include_pattern):
            unused_patterns.domain_include_patterns.discard(include_pattern)
            for exclude_prefix in DOMAIN_EXCLUDE_PREFIXES:
                if relative_path_posix.startswith(exclude_prefix):
                    unused_patterns.domain_exclude_prefixes.discard(exclude_prefix)
                    return False
            return _check_regex_match(path, search_regex)
    return False 
示例3
def get_path_from_template(path_template: str, path_type: PathType = PathType.AUTO) -> str:
    """Replace tags in the given path template and return either Windows or Linux formatted path."""
    # automatically select path type depending on running OS
    if path_type == PathType.AUTO:
        if platform.system() == "Windows":
            path_type = PathType.WINDOWS
        elif platform.system() == "Linux":
            path_type = PathType.LINUX
        else:
            raise RuntimeError("Unknown platform")

    path_template = path_template.replace("<USERNAME>", get_user_name())

    # return correctly formatted path
    if path_type == PathType.WINDOWS:
        return str(pathlib.PureWindowsPath(path_template))
    elif path_type == PathType.LINUX:
        return str(pathlib.PurePosixPath(path_template))
    else:
        raise RuntimeError("Unknown platform") 
示例4
def test_different_flavours_unordered(self):
        p = pathlib.PurePosixPath('a')
        q = pathlib.PureWindowsPath('a')
        with self.assertRaises(TypeError):
            p < q
        with self.assertRaises(TypeError):
            p <= q
        with self.assertRaises(TypeError):
            p > q
        with self.assertRaises(TypeError):
            p >= q


#
# Tests for the concrete classes
#

# Make sure any symbolic links in the base test path are resolved 
示例5
def test_different_flavours_unordered(self):
        p = pathlib.PurePosixPath('a')
        q = pathlib.PureWindowsPath('a')
        with self.assertRaises(TypeError):
            p < q
        with self.assertRaises(TypeError):
            p <= q
        with self.assertRaises(TypeError):
            p > q
        with self.assertRaises(TypeError):
            p >= q


#
# Tests for the concrete classes
#

# Make sure any symbolic links in the base test path are resolved 
示例6
def __init__(self,
                 login=None,
                 password=None, *,
                 base_path=pathlib.Path("."),
                 home_path=pathlib.PurePosixPath("/"),
                 permissions=None,
                 maximum_connections=None,
                 read_speed_limit=None,
                 write_speed_limit=None,
                 read_speed_limit_per_connection=None,
                 write_speed_limit_per_connection=None):
        self.login = login
        self.password = password
        self.base_path = pathlib.Path(base_path)
        self.home_path = pathlib.PurePosixPath(home_path)
        if not self.home_path.is_absolute():
            raise errors.PathIsNotAbsolute(home_path)
        self.permissions = permissions or [Permission()]
        self.maximum_connections = maximum_connections
        self.read_speed_limit = read_speed_limit
        self.write_speed_limit = write_speed_limit
        self.read_speed_limit_per_connection = read_speed_limit_per_connection
        # damn 80 symbols
        self.write_speed_limit_per_connection = \
            write_speed_limit_per_connection 
示例7
def get_permissions(self, path):
        """
        Return nearest parent permission for `path`.

        :param path: path which permission you want to know
        :type path: :py:class:`str` or :py:class:`pathlib.PurePosixPath`

        :rtype: :py:class:`aioftp.Permission`
        """
        path = pathlib.PurePosixPath(path)
        parents = filter(lambda p: p.is_parent(path), self.permissions)
        perm = min(
            parents,
            key=lambda p: len(path.relative_to(p.path).parts),
            default=Permission(),
        )
        return perm 
示例8
def parse_list_line(self, b):
        """
        Parse LIST response with both Microsoft Windows® parser and
        UNIX parser

        :param b: response line
        :type b: :py:class:`bytes` or :py:class:`str`

        :return: (path, info)
        :rtype: (:py:class:`pathlib.PurePosixPath`, :py:class:`dict`)
        """
        ex = []
        parsers = (
            self.parse_list_line_custom,
            self.parse_list_line_unix,
            self.parse_list_line_windows,
        )
        for parser in parsers:
            if parser is None:
                continue
            try:
                return parser(b)
            except (ValueError, KeyError, IndexError) as e:
                ex.append(e)
        raise ValueError("All parsers failed to parse", b, ex) 
示例9
def parse_mlsx_line(self, b):
        """
        Parsing MLS(T|D) response.

        :param b: response line
        :type b: :py:class:`bytes` or :py:class:`str`

        :return: (path, info)
        :rtype: (:py:class:`pathlib.PurePosixPath`, :py:class:`dict`)
        """
        if isinstance(b, bytes):
            s = b.decode(encoding=self.encoding)
        else:
            s = b
        line = s.rstrip()
        facts_found, _, name = line.partition(" ")
        entry = {}
        for fact in facts_found[:-1].split(";"):
            key, _, value = fact.partition("=")
            entry[key.lower()] = value
        return pathlib.PurePosixPath(name), entry 
示例10
def exists(self, path):
        """
        :py:func:`asyncio.coroutine`

        Check path for existence.

        :param path: path to check
        :type path: :py:class:`str` or :py:class:`pathlib.PurePosixPath`

        :rtype: :py:class:`bool`
        """
        try:
            await self.stat(path)
            return True
        except errors.StatusCodeError as e:
            if e.received_codes[-1].matches("550"):
                return False
            raise 
示例11
def remove(self, path):
        """
        :py:func:`asyncio.coroutine`

        High level remove method for removing path recursively (file or
        directory).

        :param path: path to remove
        :type path: :py:class:`str` or :py:class:`pathlib.PurePosixPath`
        """
        if await self.exists(path):
            info = await self.stat(path)
            if info["type"] == "file":
                await self.remove_file(path)
            elif info["type"] == "dir":
                for name, info in (await self.list(path)):
                    if info["type"] in ("dir", "file"):
                        await self.remove(name)
                await self.remove_directory(path) 
示例12
def upload_stream(self, destination, *, offset=0):
        """
        Create stream for write data to `destination` file.

        :param destination: destination path of file on server side
        :type destination: :py:class:`str` or :py:class:`pathlib.PurePosixPath`

        :param offset: byte offset for stream start position
        :type offset: :py:class:`int`

        :rtype: :py:class:`aioftp.DataConnectionThrottleStreamIO`
        """
        return self.get_stream(
            "STOR " + str(destination),
            "1xx",
            offset=offset,
        ) 
示例13
def append_stream(self, destination, *, offset=0):
        """
        Create stream for append (write) data to `destination` file.

        :param destination: destination path of file on server side
        :type destination: :py:class:`str` or :py:class:`pathlib.PurePosixPath`

        :param offset: byte offset for stream start position
        :type offset: :py:class:`int`

        :rtype: :py:class:`aioftp.DataConnectionThrottleStreamIO`
        """
        return self.get_stream(
            "APPE " + str(destination),
            "1xx",
            offset=offset,
        ) 
示例14
def test_client_list_override_with_custom(pair_factory, Client):
    meta = {"type": "file", "works": True}

    def parser(b):
        import pickle
        return pickle.loads(bytes.fromhex(b.decode().rstrip("\r\n")))

    async def builder(_, path):
        import pickle
        return pickle.dumps((path, meta)).hex()

    async with pair_factory(Client(parse_list_line_custom=parser)) as pair:
        pair.server.commands_mapping["mlst"] = not_implemented
        pair.server.commands_mapping["mlsd"] = not_implemented
        pair.server.build_list_string = builder
        await pair.client.make_directory("bar")
        (path, stat), *_ = files = await pair.client.list()
        assert len(files) == 1
        assert path == pathlib.PurePosixPath("bar")
        assert stat == meta 
示例15
def test_pipes_base(cleanup_pipe):
    import d6tflow.pipes
    d6tflow.pipes.init(cfg['d6tpipe_pipe1'],profile=cfg['d6tpipe_profile'])

    t1 = Task1()
    pipe1 = d6tflow.pipes.get_pipe()
    pipedir = pipe1.dirpath
    t1filepath = t1.output().path
    t1file = str(PurePosixPath(t1filepath.relative_to(pipedir)))

    assert d6tflow.run(t1)
    assert t1.complete()
    with fuckit:
        pipe1._pullpush_luigi([t1file], op='remove')
    assert pipe1.push_preview()==[t1file]
    assert pipe1.push()==[t1file]
    assert pipe1.scan_remote(cached=False) == [t1file]
    # cleanup
    pipe1.delete_files(confirm=False, all_local=True)
    assert pipe1.scan_remote(cached=False) == [] 
示例16
def modurl(qualname):
    """Get the full GitHub URL for some object’s qualname."""
    obj, module = get_obj_module(qualname)
    github_url = github_url_scvelo
    try:
        path = PurePosixPath(Path(module.__file__).resolve().relative_to(project_dir))
    except ValueError:
        # trying to document something from another package
        github_url = (
            github_url_read_loom
            if "read_loom" in qualname
            else github_url_read
            if "read" in qualname
            else github_url_scanpy
        )
        path = "/".join(module.__file__.split("/")[-2:])
    start, end = get_linenos(obj)
    fragment = f"#L{start}-L{end}" if start and end else ""
    return f"{github_url}/{path}{fragment}" 
示例17
def build_attrs(self, *args, **kwargs):
        attrs = super().build_attrs(*args, **kwargs)

        accept = attrs.get('accept')
        response = self.client.generate_presigned_post(
            self.bucket_name,
            str(pathlib.PurePosixPath(self.upload_folder, '${filename}')),
            Conditions=self.get_conditions(accept),
            ExpiresIn=self.expires,
        )

        defaults = {
            'data-fields-%s' % key: value
            for key, value in response['fields'].items()
        }
        defaults['data-url'] = response['url']
        defaults.update(attrs)

        try:
            defaults['class'] += ' s3file'
        except KeyError:
            defaults['class'] = 's3file'
        return defaults 
示例18
def copy_directory(self, host_path, guest_path):
        """
        Copies a directory from host_path (pathlib.Path) to guest_path (pathlib.PurePath).
        This is natively supported since LXD 2.2 but we have to support 2.0+
        Refs: https://github.com/lxc/lxd/issues/2401

        Uses tar to pack/unpack the directory.
        """
        guest_tar_path = self._guest_temporary_tar_path
        self.run(['mkdir', '-p', str(guest_path)])
        with tempfile.NamedTemporaryFile() as f:
            logger.debug("Creating tar file from {}".format(host_path))
            tar = tarfile.open(f.name, 'w')
            tar.add(str(host_path), arcname='.')
            tar.close()
            self.copy_file(Path(f.name), PurePosixPath(guest_tar_path))
        self.run(['tar', '-xf', guest_tar_path, '-C', str(guest_path)])
        self.run(['rm', '-f', str(guest_tar_path)])

    ##################################
    # PRIVATE METHODS AND PROPERTIES #
    ################################## 
示例19
def test_can_run_puppet_manifest_mode(self, mock_run, mock_copy_dir):
        class DummyGuest(Guest):
            name = 'dummy'
        host = Host()
        guest = DummyGuest(unittest.mock.Mock())
        mock_run.return_value = 0
        provisioner = PuppetProvisioner('./', host, [guest], {
            'manifest_file': 'test_site.pp',
            'manifests_path': 'test_manifests'})
        provisioner.provision()

        assert mock_copy_dir.call_count == 1
        assert mock_copy_dir.call_args_list[0][0][0] == Path('test_manifests')
        assert mock_copy_dir.call_args_list[0][0][1] == PurePosixPath(
            provisioner._guest_manifests_path)

        assert mock_run.call_count == 2
        assert mock_run.call_args_list[0][0][0] == ['which', 'puppet']
        assert mock_run.call_args_list[1][0][0] == [
            'sh', '-c',
            'puppet apply --detailed-exitcodes --manifestdir {} {}'.format(
                PurePosixPath(provisioner._guest_manifests_path),
                PurePosixPath(provisioner._guest_manifests_path) / 'test_site.pp')] 
示例20
def test_can_set_binary_path(self, mock_run, mock_copy_dir):
        class DummyGuest(Guest):
            name = 'dummy'
        host = Host()
        guest = DummyGuest(unittest.mock.Mock())
        mock_run.return_value = 0
        provisioner = PuppetProvisioner('./', host, [guest], {
            'manifest_file': 'test_site.pp',
            'manifests_path': 'test_manifests',
            'binary_path': '/test/path'})
        provisioner.provision()

        assert mock_run.call_count == 2
        assert mock_run.call_args_list[0][0][0] == ['test', '-x', '/test/path/puppet']
        assert mock_run.call_args_list[1][0][0] == [
            'sh', '-c',
            '/test/path/puppet apply --detailed-exitcodes --manifestdir {} {}'.format(
                PurePosixPath(provisioner._guest_manifests_path),
                PurePosixPath(provisioner._guest_manifests_path) / 'test_site.pp')] 
示例21
def test_can_set_facter(self, mock_run, mock_copy_dir):
        class DummyGuest(Guest):
            name = 'dummy'
        host = Host()
        guest = DummyGuest(unittest.mock.Mock())
        mock_run.return_value = 0
        provisioner = PuppetProvisioner('./', host, [guest], {
            'manifest_file': 'site.pp',
            'manifests_path': 'test_manifests',
            'facter': {'foo': 'bah', 'bar': 'baz baz'}})
        provisioner.provision()
        assert mock_run.call_count == 2
        assert mock_run.call_args_list[1][0][0] == [
            'sh', '-c',
            "FACTER_bar='baz baz' FACTER_foo=bah "
            "puppet apply --detailed-exitcodes --manifestdir {} {}".format(
                PurePosixPath(provisioner._guest_manifests_path),
                PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')] 
示例22
def test_can_set_hiera_config_path(self, mock_run, mock_copy_dir, mock_copy_file):
        class DummyGuest(Guest):
            name = 'dummy'
        host = Host()
        guest = DummyGuest(unittest.mock.Mock())
        mock_run.return_value = 0
        provisioner = PuppetProvisioner('./', host, [guest], {
            'manifest_file': 'site.pp',
            'manifests_path': 'test_manifests',
            'hiera_config_path': 'hiera.yaml'})
        provisioner.provision()

        assert mock_copy_file.call_count == 1
        assert mock_copy_file.call_args_list[0][0][0] == Path('hiera.yaml')
        assert mock_copy_file.call_args_list[0][0][1] == PurePosixPath(
            provisioner._guest_hiera_file)

        assert mock_run.call_count == 2
        assert mock_run.call_args_list[1][0][0] == [
            'sh', '-c',
            "puppet apply --hiera_config={} --detailed-exitcodes --manifestdir {} {}".format(
                PurePosixPath(provisioner._guest_hiera_file),
                PurePosixPath(provisioner._guest_manifests_path),
                PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')] 
示例23
def test_can_set_environment_variables(self, mock_run, mock_copy_dir):
        class DummyGuest(Guest):
            name = 'dummy'
        host = Host()
        guest = DummyGuest(unittest.mock.Mock())
        mock_run.return_value = 0
        provisioner = PuppetProvisioner('./', host, [guest], {
            'manifest_file': 'site.pp',
            'manifests_path': 'test_manifests',
            'environment_variables': {'FOO': 'bah', 'BAR': 'baz baz'}})
        provisioner.provision()
        assert mock_run.call_count == 2
        assert mock_run.call_args_list[1][0][0] == [
            'sh', '-c',
            "BAR='baz baz' FOO=bah "
            "puppet apply --detailed-exitcodes --manifestdir {} {}".format(
                PurePosixPath(provisioner._guest_manifests_path),
                PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')] 
示例24
def open_remote_sqlite_database(adb, database):
    database = PurePosixPath(database)

    with TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)

        for suffix in ['', '-journal', '-wal', '-shm']:
            remote_file = database.with_name(database.name + suffix)

            try:
                contents = adb.read_file(remote_file)
            except FileNotFoundError as e:
                # Throw the original exception if the actual db file cannot be read
                if suffix == '':
                    raise e
            else:
                (temp_dir / remote_file.name).write_bytes(contents.read())

        db_path = str(temp_dir / database.name)

        with contextlib.closing(sqlite3.connect(db_path)) as connection:
            connection.row_factory = sqlite3.Row

            yield connection 
示例25
def _parse(application_endpoint: str) -> Tuple[str, str, str]:
        url, valid = LuisApplication._try_parse_url(application_endpoint)
        if not valid:
            raise ValueError(
                f"{application_endpoint} is not a valid LUIS application endpoint."
            )

        segments = PurePosixPath(unquote(url.path)).parts
        application_id = segments[-1] if segments else None
        qs_parsed_result = parse_qs(url.query)
        endpoint_key = qs_parsed_result.get("subscription-key", [None])[0]

        parts_for_base_url = url.scheme, url.netloc, "", None, None, None
        endpoint = urlunparse(parts_for_base_url)
        return (application_id, endpoint_key, endpoint) 
示例26
def _pullpush_luigi(self, files, op, cnxn=None):
        if cnxn is None:
            cnxn = self._connect(op in ['put','remove'])

        filessync = []
        pbar = ""
        for fname in tqdm(files):
            pbar = pbar + fname
            fnameremote = self.remote_prefix+fname
            fnamelocalpath = self.dirpath/fname
            fnamelocal = str(PurePosixPath(fnamelocalpath))
            if op=='put':
                cnxn.put(fnamelocal, fnameremote)
            elif op=='get':
                fnamelocalpath.parent.mkdir(parents=True, exist_ok=True)
                cnxn.get(fnameremote, fnamelocal)
            elif op=='remove':
                try:
                    cnxn.remove(fnameremote)
                except:
                    warnings.warn('Unable to delete remote file {}'.format(fnameremote))
            elif op=='exists':
                fname = cnxn.exists(fnameremote)
            else:
                raise ValueError('invalid luigi operation')

            logging.info('synced files {}'.format(fname))
            filessync.append(fname)

        self._disconnect(cnxn)

        return filessync 
示例27
def _sftp_put(self, local_path, path, atomic):
        normpath = str(PurePosixPath(path))
        directory = str(PurePosixPath(path).parent)
        self.conn.makedirs(directory)

        if atomic:
            tmp_path = os.path.join(directory, 'luigi-tmp-{:09d}'.format(random.randrange(0, 1e10)))
        else:
            tmp_path = normpath

        self.conn.put(local_path, tmp_path)

        if atomic:
            self.conn.rename(tmp_path, normpath) 
示例28
def _sftp_listdir_attr(self, path):
        contents = []
        def getttr(_path):
            o = self.conn.lstat(_path)
            o.path = _path
            o.relpath = _path[len(path):]
            o.filename = PurePosixPath(_path).name
            contents.append(o)
        # assert False
        self.conn.walktree(path,getttr,lambda x: 1+1,lambda: 1+1)

        return contents 
示例29
def collect_files(template_dir, url_base):
    basepath = PurePosixPath(template_dir)
    baseurl = PurePosixPath(url_base)

    for dirname, _, files in os.walk(template_dir):
        rel_dirname = PurePosixPath(dirname).relative_to(template_dir)
        for filename in files:
            template_path = path.join(dirname, filename)
            url = baseurl.joinpath(rel_dirname, filename)
            with open(template_path, "r", encoding="utf8") as f:
                yield str(url), file_contents(f) 
示例30
def hash_to_remote_path(self, sha256:str) -> PurePosixPath:
        """Get the remote path (in posix format)

        :param sha256:
        :return: Path to the stored file
        """

        return PurePosixPath("data", sha256[:2], sha256[2:])