Python源码示例:pathlib.PurePosixPath()
示例1
def _ftp_put(self, local_path, path, atomic):
normpath = str(PurePosixPath(path))
folder = str(PurePosixPath(path).parent)
self._ftp_mkdirs(folder)
# go back to ftp root folder
self.conn.cwd("/")
# random file name
if atomic:
tmp_path = folder + os.sep + 'luigi-tmp-%09d' % random.randrange(0, 1e10)
else:
tmp_path = normpath
self.conn.storbinary('STOR %s' % tmp_path, open(local_path, 'rb'))
if atomic:
self.conn.rename(tmp_path, normpath)
示例2
def should_domain_substitute(path, relative_path, search_regex, unused_patterns):
"""
Returns True if a path should be domain substituted in the source tree; False otherwise
path is the pathlib.Path to the file from the current working directory.
relative_path is the pathlib.Path to the file from the source tree.
search_regex is a compiled regex object to search for domain names
unused_patterns is a UnusedPatterns object
"""
relative_path_posix = relative_path.as_posix().lower()
for include_pattern in DOMAIN_INCLUDE_PATTERNS:
if PurePosixPath(relative_path_posix).match(include_pattern):
unused_patterns.domain_include_patterns.discard(include_pattern)
for exclude_prefix in DOMAIN_EXCLUDE_PREFIXES:
if relative_path_posix.startswith(exclude_prefix):
unused_patterns.domain_exclude_prefixes.discard(exclude_prefix)
return False
return _check_regex_match(path, search_regex)
return False
示例3
def get_path_from_template(path_template: str, path_type: PathType = PathType.AUTO) -> str:
"""Replace tags in the given path template and return either Windows or Linux formatted path."""
# automatically select path type depending on running OS
if path_type == PathType.AUTO:
if platform.system() == "Windows":
path_type = PathType.WINDOWS
elif platform.system() == "Linux":
path_type = PathType.LINUX
else:
raise RuntimeError("Unknown platform")
path_template = path_template.replace("<USERNAME>", get_user_name())
# return correctly formatted path
if path_type == PathType.WINDOWS:
return str(pathlib.PureWindowsPath(path_template))
elif path_type == PathType.LINUX:
return str(pathlib.PurePosixPath(path_template))
else:
raise RuntimeError("Unknown platform")
示例4
def test_different_flavours_unordered(self):
p = pathlib.PurePosixPath('a')
q = pathlib.PureWindowsPath('a')
with self.assertRaises(TypeError):
p < q
with self.assertRaises(TypeError):
p <= q
with self.assertRaises(TypeError):
p > q
with self.assertRaises(TypeError):
p >= q
#
# Tests for the concrete classes
#
# Make sure any symbolic links in the base test path are resolved
示例5
def test_different_flavours_unordered(self):
p = pathlib.PurePosixPath('a')
q = pathlib.PureWindowsPath('a')
with self.assertRaises(TypeError):
p < q
with self.assertRaises(TypeError):
p <= q
with self.assertRaises(TypeError):
p > q
with self.assertRaises(TypeError):
p >= q
#
# Tests for the concrete classes
#
# Make sure any symbolic links in the base test path are resolved
示例6
def __init__(self,
login=None,
password=None, *,
base_path=pathlib.Path("."),
home_path=pathlib.PurePosixPath("/"),
permissions=None,
maximum_connections=None,
read_speed_limit=None,
write_speed_limit=None,
read_speed_limit_per_connection=None,
write_speed_limit_per_connection=None):
self.login = login
self.password = password
self.base_path = pathlib.Path(base_path)
self.home_path = pathlib.PurePosixPath(home_path)
if not self.home_path.is_absolute():
raise errors.PathIsNotAbsolute(home_path)
self.permissions = permissions or [Permission()]
self.maximum_connections = maximum_connections
self.read_speed_limit = read_speed_limit
self.write_speed_limit = write_speed_limit
self.read_speed_limit_per_connection = read_speed_limit_per_connection
# damn 80 symbols
self.write_speed_limit_per_connection = \
write_speed_limit_per_connection
示例7
def get_permissions(self, path):
"""
Return nearest parent permission for `path`.
:param path: path which permission you want to know
:type path: :py:class:`str` or :py:class:`pathlib.PurePosixPath`
:rtype: :py:class:`aioftp.Permission`
"""
path = pathlib.PurePosixPath(path)
parents = filter(lambda p: p.is_parent(path), self.permissions)
perm = min(
parents,
key=lambda p: len(path.relative_to(p.path).parts),
default=Permission(),
)
return perm
示例8
def parse_list_line(self, b):
"""
Parse LIST response with both Microsoft Windows® parser and
UNIX parser
:param b: response line
:type b: :py:class:`bytes` or :py:class:`str`
:return: (path, info)
:rtype: (:py:class:`pathlib.PurePosixPath`, :py:class:`dict`)
"""
ex = []
parsers = (
self.parse_list_line_custom,
self.parse_list_line_unix,
self.parse_list_line_windows,
)
for parser in parsers:
if parser is None:
continue
try:
return parser(b)
except (ValueError, KeyError, IndexError) as e:
ex.append(e)
raise ValueError("All parsers failed to parse", b, ex)
示例9
def parse_mlsx_line(self, b):
"""
Parsing MLS(T|D) response.
:param b: response line
:type b: :py:class:`bytes` or :py:class:`str`
:return: (path, info)
:rtype: (:py:class:`pathlib.PurePosixPath`, :py:class:`dict`)
"""
if isinstance(b, bytes):
s = b.decode(encoding=self.encoding)
else:
s = b
line = s.rstrip()
facts_found, _, name = line.partition(" ")
entry = {}
for fact in facts_found[:-1].split(";"):
key, _, value = fact.partition("=")
entry[key.lower()] = value
return pathlib.PurePosixPath(name), entry
示例10
def exists(self, path):
"""
:py:func:`asyncio.coroutine`
Check path for existence.
:param path: path to check
:type path: :py:class:`str` or :py:class:`pathlib.PurePosixPath`
:rtype: :py:class:`bool`
"""
try:
await self.stat(path)
return True
except errors.StatusCodeError as e:
if e.received_codes[-1].matches("550"):
return False
raise
示例11
def remove(self, path):
"""
:py:func:`asyncio.coroutine`
High level remove method for removing path recursively (file or
directory).
:param path: path to remove
:type path: :py:class:`str` or :py:class:`pathlib.PurePosixPath`
"""
if await self.exists(path):
info = await self.stat(path)
if info["type"] == "file":
await self.remove_file(path)
elif info["type"] == "dir":
for name, info in (await self.list(path)):
if info["type"] in ("dir", "file"):
await self.remove(name)
await self.remove_directory(path)
示例12
def upload_stream(self, destination, *, offset=0):
"""
Create stream for write data to `destination` file.
:param destination: destination path of file on server side
:type destination: :py:class:`str` or :py:class:`pathlib.PurePosixPath`
:param offset: byte offset for stream start position
:type offset: :py:class:`int`
:rtype: :py:class:`aioftp.DataConnectionThrottleStreamIO`
"""
return self.get_stream(
"STOR " + str(destination),
"1xx",
offset=offset,
)
示例13
def append_stream(self, destination, *, offset=0):
"""
Create stream for append (write) data to `destination` file.
:param destination: destination path of file on server side
:type destination: :py:class:`str` or :py:class:`pathlib.PurePosixPath`
:param offset: byte offset for stream start position
:type offset: :py:class:`int`
:rtype: :py:class:`aioftp.DataConnectionThrottleStreamIO`
"""
return self.get_stream(
"APPE " + str(destination),
"1xx",
offset=offset,
)
示例14
def test_client_list_override_with_custom(pair_factory, Client):
meta = {"type": "file", "works": True}
def parser(b):
import pickle
return pickle.loads(bytes.fromhex(b.decode().rstrip("\r\n")))
async def builder(_, path):
import pickle
return pickle.dumps((path, meta)).hex()
async with pair_factory(Client(parse_list_line_custom=parser)) as pair:
pair.server.commands_mapping["mlst"] = not_implemented
pair.server.commands_mapping["mlsd"] = not_implemented
pair.server.build_list_string = builder
await pair.client.make_directory("bar")
(path, stat), *_ = files = await pair.client.list()
assert len(files) == 1
assert path == pathlib.PurePosixPath("bar")
assert stat == meta
示例15
def test_pipes_base(cleanup_pipe):
import d6tflow.pipes
d6tflow.pipes.init(cfg['d6tpipe_pipe1'],profile=cfg['d6tpipe_profile'])
t1 = Task1()
pipe1 = d6tflow.pipes.get_pipe()
pipedir = pipe1.dirpath
t1filepath = t1.output().path
t1file = str(PurePosixPath(t1filepath.relative_to(pipedir)))
assert d6tflow.run(t1)
assert t1.complete()
with fuckit:
pipe1._pullpush_luigi([t1file], op='remove')
assert pipe1.push_preview()==[t1file]
assert pipe1.push()==[t1file]
assert pipe1.scan_remote(cached=False) == [t1file]
# cleanup
pipe1.delete_files(confirm=False, all_local=True)
assert pipe1.scan_remote(cached=False) == []
示例16
def modurl(qualname):
"""Get the full GitHub URL for some object’s qualname."""
obj, module = get_obj_module(qualname)
github_url = github_url_scvelo
try:
path = PurePosixPath(Path(module.__file__).resolve().relative_to(project_dir))
except ValueError:
# trying to document something from another package
github_url = (
github_url_read_loom
if "read_loom" in qualname
else github_url_read
if "read" in qualname
else github_url_scanpy
)
path = "/".join(module.__file__.split("/")[-2:])
start, end = get_linenos(obj)
fragment = f"#L{start}-L{end}" if start and end else ""
return f"{github_url}/{path}{fragment}"
示例17
def build_attrs(self, *args, **kwargs):
attrs = super().build_attrs(*args, **kwargs)
accept = attrs.get('accept')
response = self.client.generate_presigned_post(
self.bucket_name,
str(pathlib.PurePosixPath(self.upload_folder, '${filename}')),
Conditions=self.get_conditions(accept),
ExpiresIn=self.expires,
)
defaults = {
'data-fields-%s' % key: value
for key, value in response['fields'].items()
}
defaults['data-url'] = response['url']
defaults.update(attrs)
try:
defaults['class'] += ' s3file'
except KeyError:
defaults['class'] = 's3file'
return defaults
示例18
def copy_directory(self, host_path, guest_path):
"""
Copies a directory from host_path (pathlib.Path) to guest_path (pathlib.PurePath).
This is natively supported since LXD 2.2 but we have to support 2.0+
Refs: https://github.com/lxc/lxd/issues/2401
Uses tar to pack/unpack the directory.
"""
guest_tar_path = self._guest_temporary_tar_path
self.run(['mkdir', '-p', str(guest_path)])
with tempfile.NamedTemporaryFile() as f:
logger.debug("Creating tar file from {}".format(host_path))
tar = tarfile.open(f.name, 'w')
tar.add(str(host_path), arcname='.')
tar.close()
self.copy_file(Path(f.name), PurePosixPath(guest_tar_path))
self.run(['tar', '-xf', guest_tar_path, '-C', str(guest_path)])
self.run(['rm', '-f', str(guest_tar_path)])
##################################
# PRIVATE METHODS AND PROPERTIES #
##################################
示例19
def test_can_run_puppet_manifest_mode(self, mock_run, mock_copy_dir):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'test_site.pp',
'manifests_path': 'test_manifests'})
provisioner.provision()
assert mock_copy_dir.call_count == 1
assert mock_copy_dir.call_args_list[0][0][0] == Path('test_manifests')
assert mock_copy_dir.call_args_list[0][0][1] == PurePosixPath(
provisioner._guest_manifests_path)
assert mock_run.call_count == 2
assert mock_run.call_args_list[0][0][0] == ['which', 'puppet']
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
'puppet apply --detailed-exitcodes --manifestdir {} {}'.format(
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'test_site.pp')]
示例20
def test_can_set_binary_path(self, mock_run, mock_copy_dir):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'test_site.pp',
'manifests_path': 'test_manifests',
'binary_path': '/test/path'})
provisioner.provision()
assert mock_run.call_count == 2
assert mock_run.call_args_list[0][0][0] == ['test', '-x', '/test/path/puppet']
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
'/test/path/puppet apply --detailed-exitcodes --manifestdir {} {}'.format(
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'test_site.pp')]
示例21
def test_can_set_facter(self, mock_run, mock_copy_dir):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'site.pp',
'manifests_path': 'test_manifests',
'facter': {'foo': 'bah', 'bar': 'baz baz'}})
provisioner.provision()
assert mock_run.call_count == 2
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
"FACTER_bar='baz baz' FACTER_foo=bah "
"puppet apply --detailed-exitcodes --manifestdir {} {}".format(
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')]
示例22
def test_can_set_hiera_config_path(self, mock_run, mock_copy_dir, mock_copy_file):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'site.pp',
'manifests_path': 'test_manifests',
'hiera_config_path': 'hiera.yaml'})
provisioner.provision()
assert mock_copy_file.call_count == 1
assert mock_copy_file.call_args_list[0][0][0] == Path('hiera.yaml')
assert mock_copy_file.call_args_list[0][0][1] == PurePosixPath(
provisioner._guest_hiera_file)
assert mock_run.call_count == 2
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
"puppet apply --hiera_config={} --detailed-exitcodes --manifestdir {} {}".format(
PurePosixPath(provisioner._guest_hiera_file),
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')]
示例23
def test_can_set_environment_variables(self, mock_run, mock_copy_dir):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'site.pp',
'manifests_path': 'test_manifests',
'environment_variables': {'FOO': 'bah', 'BAR': 'baz baz'}})
provisioner.provision()
assert mock_run.call_count == 2
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
"BAR='baz baz' FOO=bah "
"puppet apply --detailed-exitcodes --manifestdir {} {}".format(
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')]
示例24
def open_remote_sqlite_database(adb, database):
database = PurePosixPath(database)
with TemporaryDirectory() as temp_dir:
temp_dir = Path(temp_dir)
for suffix in ['', '-journal', '-wal', '-shm']:
remote_file = database.with_name(database.name + suffix)
try:
contents = adb.read_file(remote_file)
except FileNotFoundError as e:
# Throw the original exception if the actual db file cannot be read
if suffix == '':
raise e
else:
(temp_dir / remote_file.name).write_bytes(contents.read())
db_path = str(temp_dir / database.name)
with contextlib.closing(sqlite3.connect(db_path)) as connection:
connection.row_factory = sqlite3.Row
yield connection
示例25
def _parse(application_endpoint: str) -> Tuple[str, str, str]:
url, valid = LuisApplication._try_parse_url(application_endpoint)
if not valid:
raise ValueError(
f"{application_endpoint} is not a valid LUIS application endpoint."
)
segments = PurePosixPath(unquote(url.path)).parts
application_id = segments[-1] if segments else None
qs_parsed_result = parse_qs(url.query)
endpoint_key = qs_parsed_result.get("subscription-key", [None])[0]
parts_for_base_url = url.scheme, url.netloc, "", None, None, None
endpoint = urlunparse(parts_for_base_url)
return (application_id, endpoint_key, endpoint)
示例26
def _pullpush_luigi(self, files, op, cnxn=None):
if cnxn is None:
cnxn = self._connect(op in ['put','remove'])
filessync = []
pbar = ""
for fname in tqdm(files):
pbar = pbar + fname
fnameremote = self.remote_prefix+fname
fnamelocalpath = self.dirpath/fname
fnamelocal = str(PurePosixPath(fnamelocalpath))
if op=='put':
cnxn.put(fnamelocal, fnameremote)
elif op=='get':
fnamelocalpath.parent.mkdir(parents=True, exist_ok=True)
cnxn.get(fnameremote, fnamelocal)
elif op=='remove':
try:
cnxn.remove(fnameremote)
except:
warnings.warn('Unable to delete remote file {}'.format(fnameremote))
elif op=='exists':
fname = cnxn.exists(fnameremote)
else:
raise ValueError('invalid luigi operation')
logging.info('synced files {}'.format(fname))
filessync.append(fname)
self._disconnect(cnxn)
return filessync
示例27
def _sftp_put(self, local_path, path, atomic):
normpath = str(PurePosixPath(path))
directory = str(PurePosixPath(path).parent)
self.conn.makedirs(directory)
if atomic:
tmp_path = os.path.join(directory, 'luigi-tmp-{:09d}'.format(random.randrange(0, 1e10)))
else:
tmp_path = normpath
self.conn.put(local_path, tmp_path)
if atomic:
self.conn.rename(tmp_path, normpath)
示例28
def _sftp_listdir_attr(self, path):
contents = []
def getttr(_path):
o = self.conn.lstat(_path)
o.path = _path
o.relpath = _path[len(path):]
o.filename = PurePosixPath(_path).name
contents.append(o)
# assert False
self.conn.walktree(path,getttr,lambda x: 1+1,lambda: 1+1)
return contents
示例29
def collect_files(template_dir, url_base):
basepath = PurePosixPath(template_dir)
baseurl = PurePosixPath(url_base)
for dirname, _, files in os.walk(template_dir):
rel_dirname = PurePosixPath(dirname).relative_to(template_dir)
for filename in files:
template_path = path.join(dirname, filename)
url = baseurl.joinpath(rel_dirname, filename)
with open(template_path, "r", encoding="utf8") as f:
yield str(url), file_contents(f)
示例30
def hash_to_remote_path(self, sha256:str) -> PurePosixPath:
"""Get the remote path (in posix format)
:param sha256:
:return: Path to the stored file
"""
return PurePosixPath("data", sha256[:2], sha256[2:])