Python源码示例:pathlib.PurePath()
示例1
def validate_url(wrapped, instance, args, kwargs):
"""Enforces argument named "url" to be relative path.
Check that it is instance of str or os.PathLike and that it represents
relative path.
"""
try:
# Use -1 since self is not included in the args.
url = args[getfullargspec(wrapped).args.index("url") - 1]
except IndexError:
url = kwargs.get("url")
if not isinstance(url, (str, PathLike)):
raise TypeError("Argument 'url' must be a string or path-like object")
if PurePath(url).is_absolute():
raise ValueError("Argument 'url' must be a relative path")
return wrapped(*args, **kwargs)
示例2
def validate_urls(wrapped, instance, args, kwargs):
"""Enforces argument named "urls" to be a list of relative paths."""
try:
# Use -1 since self is not included in the args.
urls = args[getfullargspec(wrapped).args.index("urls") - 1]
except IndexError:
urls = kwargs.get("urls")
# Check that URLS is really a list of strings.
if not isinstance(urls, list):
raise TypeError("Argument urls must be a list of strings or path-like objects")
if not all(isinstance(url, (str, PathLike)) for url in urls):
raise TypeError("Argument urls must be a list of strings or path-like objects")
# Check that all URLS are relative.
if any(PurePath(url).is_absolute() for url in urls):
raise ValueError("Paths must be relative.")
return wrapped(*args, *kwargs)
示例3
def load_conda_build_config(platform=None, trim_skip=True):
"""
Load conda build config while considering global pinnings from conda-forge.
"""
config = api.Config(
no_download_source=True,
set_build_id=False)
# get environment root
env_root = PurePath(shutil.which("bioconda-utils")).parents[1]
# set path to pinnings from conda forge package
config.exclusive_config_files = [
os.path.join(env_root, "conda_build_config.yaml"),
os.path.join(
os.path.dirname(__file__),
'bioconda_utils-conda_build_config.yaml'),
]
for cfg in chain(config.exclusive_config_files, config.variant_config_files or []):
assert os.path.exists(cfg), ('error: {0} does not exist'.format(cfg))
if platform:
config.platform = platform
config.trim_skip = trim_skip
return config
示例4
def load_exp_matrix(fname: str, transpose: bool = False,
return_sparse: bool = False,
attribute_name_cell_id: str = ATTRIBUTE_NAME_CELL_IDENTIFIER,
attribute_name_gene: str = ATTRIBUTE_NAME_GENE) -> pd.DataFrame:
"""
Load expression matrix from disk.
Supported file formats are CSV, TSV and LOOM.
:param fname: The name of the file that contains the expression matrix.
:param transpose: Is the expression matrix stored as (rows = genes x columns = cells)?
:param return_sparse: Returns a sparse matrix when loading from loom
:return: A 2-dimensional dataframe (rows = cells x columns = genes).
"""
extension = PurePath(fname).suffixes
if is_valid_suffix(extension, 'grn'):
if '.loom' in extension:
return load_exp_matrix_as_loom(fname, return_sparse, attribute_name_cell_id, attribute_name_gene)
else:
df = pd.read_csv(fname, sep=suffixes_to_separator(extension), header=0, index_col=0)
return df.T if transpose else df
else:
raise ValueError("Unknown file format \"{}\".".format(fname))
示例5
def save_matrix(df: pd.DataFrame, fname: str, transpose: bool = False) -> None:
"""
Save matrix to disk.
Supported file formats are CSV, TSV and LOOM.
:param df: A 2-dimensional dataframe (rows = cells x columns = genes).
:param fname: The name of the file to be written.
:param transpose: Should the expression matrix be stored as (rows = genes x columns = cells)?
"""
extension = PurePath(fname).suffixes
if is_valid_suffix(extension, 'aucell'):
if '.loom' in extension:
return save_df_as_loom(df, fname)
else:
(df.T if transpose else df).to_csv(fname, sep=suffixes_to_separator(extension))
else:
raise ValueError("Unknown file format \"{}\".".format(fname))
示例6
def load_modules(fname: str) -> Sequence[Type[GeneSignature]]:
# Loading from YAML is extremely slow. Therefore this is a potential performance improvement.
# Potential improvements are switching to JSON or to use a CLoader:
# https://stackoverflow.com/questions/27743711/can-i-speedup-yaml
# The alternative for which was opted in the end is binary pickling.
extension = PurePath(fname).suffixes
if is_valid_suffix(extension, 'ctx_yaml'):
return load_from_yaml(fname)
elif '.dat' in extension:
with openfile(fname, 'rb') as f:
return pickle.load(f)
elif '.gmt' in extension:
sep = guess_separator(fname)
return GeneSignature.from_gmt(fname,
field_separator=sep,
gene_separator=sep)
else:
raise ValueError("Unknown file format for \"{}\".".format(fname))
示例7
def _format_value(v):
if isinstance(v, bool):
return 'true' if v else 'false'
if isinstance(v, int) or isinstance(v, long):
return unicode(v)
if isinstance(v, float):
if math.isnan(v) or math.isinf(v):
raise ValueError("{0} is not a valid TOML value".format(v))
else:
return repr(v)
elif isinstance(v, unicode) or isinstance(v, bytes):
return _escape_string(v)
elif isinstance(v, datetime.datetime):
return format_rfc3339(v)
elif isinstance(v, list):
return '[{0}]'.format(', '.join(_format_value(obj) for obj in v))
elif isinstance(v, dict):
return '{{{0}}}'.format(', '.join('{} = {}'.format(_escape_id(k), _format_value(obj)) for k, obj in v.items()))
elif isinstance(v, _path_types):
return _escape_string(str(v))
else:
raise RuntimeError(v)
示例8
def _format_value(v):
if isinstance(v, bool):
return 'true' if v else 'false'
if isinstance(v, int) or isinstance(v, long):
return unicode(v)
if isinstance(v, float):
if math.isnan(v) or math.isinf(v):
raise ValueError("{0} is not a valid TOML value".format(v))
else:
return repr(v)
elif isinstance(v, unicode) or isinstance(v, bytes):
return _escape_string(v)
elif isinstance(v, datetime.datetime):
return format_rfc3339(v)
elif isinstance(v, list):
return '[{0}]'.format(', '.join(_format_value(obj) for obj in v))
elif isinstance(v, dict):
return '{{{0}}}'.format(', '.join('{} = {}'.format(_escape_id(k), _format_value(obj)) for k, obj in v.items()))
elif isinstance(v, _path_types):
return _escape_string(str(v))
else:
raise RuntimeError(v)
示例9
def remount_workspace_new_timestamp(
self, workspace_id: EntryID, original_timestamp: Pendulum, target_timestamp: Pendulum
) -> PurePath:
"""
Mount the workspace at target_timestamp, and unmount the workspace at the original
timestamp if it is not None. If both timestamps are equals, do nothing.
"""
# TODO : use different workspaces for temp mount
if original_timestamp == target_timestamp:
try:
return self._mountpoint_tasks[(workspace_id, target_timestamp)].value
except KeyError:
pass
new_workspace = await self._load_workspace_timestamped(workspace_id, target_timestamp)
runner_task = await self._mount_workspace_helper(new_workspace, target_timestamp)
if original_timestamp is not None:
if (workspace_id, original_timestamp) not in self._mountpoint_tasks:
raise MountpointNotMounted(f"Workspace `{workspace_id}` not mounted.")
await self._mountpoint_tasks[(workspace_id, original_timestamp)].cancel_and_join()
return runner_task.value
示例10
def backup(folder,level="full"):
selected_files = user_files["minimal"] if level == "minimal" else user_files["minimal"] + user_files["full"]
real_files = []
for g in selected_files:
real_files += glob.glob(datadir(g))
log("Creating backup of " + str(len(real_files)) + " files...")
now = datetime.utcnow()
timestr = now.strftime("%Y_%m_%d_%H_%M_%S")
filename = "maloja_backup_" + timestr + ".tar.gz"
archivefile = os.path.join(folder,filename)
assert not os.path.exists(archivefile)
with tarfile.open(name=archivefile,mode="x:gz") as archive:
for f in real_files:
p = PurePath(f)
r = p.relative_to(datadir())
archive.add(f,arcname=r)
log("Backup created!")
示例11
def bread(fd):
# type: (Union[bytes, str, pathlib.Path, pathlib.PurePath, TextIO, BinaryIO]) -> bytes
"""Return bdecoded data from filename, file, or file-like object.
if fd is a bytes/string or pathlib.Path-like object, it is opened and
read, otherwise .read() is used. if read() not available, exception
raised.
"""
if isinstance(fd, (bytes, str)):
with open(fd, 'rb') as fd:
return bdecode(fd.read())
elif pathlib is not None and isinstance(fd, (pathlib.Path, pathlib.PurePath)):
with open(str(fd), 'rb') as fd:
return bdecode(fd.read())
else:
return bdecode(fd.read())
示例12
def bwrite(data, fd):
# type: (Union[Tuple, List, OrderedDict, Dict, bool, int, str, bytes], Union[bytes, str, pathlib.Path, pathlib.PurePath, TextIO, BinaryIO]) -> None
"""Write data in bencoded form to filename, file, or file-like object.
if fd is bytes/string or pathlib.Path-like object, it is opened and
written to, otherwise .write() is used. if write() is not available,
exception raised.
"""
if isinstance(fd, (bytes, str)):
with open(fd, 'wb') as fd:
fd.write(bencode(data))
elif pathlib is not None and isinstance(fd, (pathlib.Path, pathlib.PurePath)):
with open(str(fd), 'wb') as fd:
fd.write(bencode(data))
else:
fd.write(bencode(data))
示例13
def copy_files_to_zip(self, dst_file_name, src_dir, is_logs):
dst = os.path.join(os.getcwd(), dst_file_name)
mode = 'w' if not os.path.isfile(dst) else 'a'
optional_dir_name = self.test_config.config_file.replace('.', '_')
if is_logs is True:
log_dir = os.path.join(src_dir, optional_dir_name)
glob_path = glob.glob(os.path.join(log_dir, '*.txt'))
glob_path.extend(glob.glob(os.path.join(log_dir, '*.log')))
glob_path.extend(glob.glob(os.path.join(log_dir, 'crashdumps/*')))
else:
glob_path = glob.glob(os.path.join(src_dir, 'actual.*'))
with zipfile.ZipFile(dst, mode, zipfile.ZIP_DEFLATED) as myzip:
for actual in glob_path:
path = pathlib.PurePath(actual)
file_to_be_zipped = path.name
inner_output = os.path.join(optional_dir_name, file_to_be_zipped)
myzip.write(actual, inner_output)
示例14
def _get_staging_location(self):
if self._gcs_staging_checked:
return self._gcs_staging
# Configure the GCS staging bucket
if self._gcs_staging is None:
try:
gcs_bucket = _get_project_id()
except:
raise ValueError('Cannot get the Google Cloud project ID, please specify the gcs_staging argument.')
self._gcs_staging = 'gs://' + gcs_bucket + '/' + GCS_STAGING_BLOB_DEFAULT_PREFIX
else:
from pathlib import PurePath
path = PurePath(self._gcs_staging).parts
if len(path) < 2 or not path[0].startswith('gs'):
raise ValueError('Error: {} should be a GCS path.'.format(self._gcs_staging))
gcs_bucket = path[1]
from ._gcs_helper import GCSHelper
GCSHelper.create_gcs_bucket_if_not_exist(gcs_bucket)
self._gcs_staging_checked = True
return self._gcs_staging
示例15
def fspath(path):
"""
Return the string representation of the path.
If str or bytes is passed in, it is returned unchanged.
This code comes from PEP 519, modified to support earlier versions of
python.
This is required for python < 3.6.
"""
if isinstance(path, (py.builtin.text, py.builtin.bytes)):
return path
# Work from the object's type to match method resolution of other magic
# methods.
path_type = type(path)
try:
return path_type.__fspath__(path)
except AttributeError:
if hasattr(path_type, '__fspath__'):
raise
try:
import pathlib
except import_errors:
pass
else:
if isinstance(path, pathlib.PurePath):
return py.builtin.text(path)
raise TypeError("expected str, bytes or os.PathLike object, not "
+ path_type.__name__)
示例16
def default():
par = {"ALYX_LOGIN": "test_user",
"ALYX_PWD": "TapetesBloc18",
"ALYX_URL": "https://test.alyx.internationalbrainlab.org",
"CACHE_DIR": str(PurePath(Path.home(), "Downloads", "FlatIron")),
"FTP_DATA_SERVER": "ftp://ibl.flatironinstitute.org",
"FTP_DATA_SERVER_LOGIN": "iblftp",
"FTP_DATA_SERVER_PWD": None,
"HTTP_DATA_SERVER": "http://ibl.flatironinstitute.org",
"HTTP_DATA_SERVER_LOGIN": "iblmember",
"HTTP_DATA_SERVER_PWD": None,
"GLOBUS_CLIENT_ID": None,
}
return iopar.from_dict(par)
示例17
def setup():
par_current = iopar.read(_PAR_ID_STR)
par_default = default()
if par_current is None:
par_current = par_default
par = iopar.as_dict(par_default)
for k in par.keys():
cpar = _get_current_par(k, par_current)
if "PWD" not in k:
par[k] = input("Param " + k + ", current value is [" + str(cpar) + "]:") or cpar
cpar = _get_current_par("ALYX_PWD", par_current)
prompt = "Enter the Alyx password for " + par["ALYX_LOGIN"] + '(leave empty to keep current):'
par["ALYX_PWD"] = getpass(prompt) or cpar
cpar = _get_current_par("HTTP_DATA_SERVER_PWD", par_current)
prompt = "Enter the FlatIron HTTP password for " + par["HTTP_DATA_SERVER_LOGIN"] +\
'(leave empty to keep current): '
par["HTTP_DATA_SERVER_PWD"] = getpass(prompt) or cpar
cpar = _get_current_par("FTP_DATA_SERVER_PWD", par_current)
prompt = "Enter the FlatIron FTP password for " + par["FTP_DATA_SERVER_LOGIN"] +\
'(leave empty to keep current): '
par["FTP_DATA_SERVER_PWD"] = getpass(prompt) or cpar
# default to home dir if empty dir somehow made it here
if len(par['CACHE_DIR']) == 0:
par['CACHE_DIR'] = str(PurePath(Path.home(), "Downloads", "FlatIron"))
par = iopar.from_dict(par)
# create directory if needed
if par.CACHE_DIR and not os.path.isdir(par.CACHE_DIR):
os.mkdir(par.CACHE_DIR)
iopar.write(_PAR_ID_STR, par)
print('ONE Parameter file location: ' + iopar.getfile(_PAR_ID_STR))
示例18
def _get_cache_dir(self, cache_dir):
if not cache_dir:
cache_dir = self._par.CACHE_DIR
# if empty in parameter file, do not allow and set default
if not cache_dir:
cache_dir = str(PurePath(Path.home(), "Downloads", "FlatIron"))
return cache_dir
示例19
def getfile(str_params):
"""
Returns full path of the param file per system convention:
linux/mac: ~/.str_params, Windows: APPDATA folder
:param str_params: string that identifies parm file
:return: string of full path
"""
if sys.platform == 'win32' or sys.platform == 'cygwin':
pfile = str(pathlib.PurePath(os.environ['APPDATA'], '.' + str_params))
else:
pfile = str(pathlib.PurePath(pathlib.Path.home(), '.' + str_params))
return pfile
示例20
def count_frames_and_secs(path):
"""
Get the number of frames and number of seconds for the given video
file. Note that this operation can be quite slow for large files.
Disclaimer: I've seen this produce different results from actually reading
the frames with older versions of ffmpeg (2.x). Therefore I cannot say
with 100% certainty that the returned values are always exact.
"""
# https://stackoverflow.com/questions/2017843/fetch-frame-count-with-ffmpeg
if isinstance(path, pathlib.PurePath):
path = str(path)
if not isinstance(path, str):
raise TypeError("Video path must be a string or pathlib.Path.")
cmd = [_get_exe(), "-i", path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-"]
try:
out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, **_popen_kwargs())
except subprocess.CalledProcessError as err:
out = err.output.decode(errors="ignore")
raise RuntimeError("FFMEG call failed with {}:\n{}".format(err.returncode, out))
# Note that other than with the subprocess calls below, ffmpeg wont hang here.
# Worst case Python will stop/crash and ffmpeg will continue running until done.
nframes = nsecs = None
for line in reversed(out.splitlines()):
if line.startswith(b"frame="):
line = line.decode(errors="ignore")
i = line.find("frame=")
if i >= 0:
s = line[i:].split("=", 1)[-1].lstrip().split(" ", 1)[0].strip()
nframes = int(s)
i = line.find("time=")
if i >= 0:
s = line[i:].split("=", 1)[-1].lstrip().split(" ", 1)[0].strip()
nsecs = cvsecs(*s.split(":"))
return nframes, nsecs
raise RuntimeError("Could not get number of frames") # pragma: no cover
示例21
def _str2path(self, *args):
for attr in args:
if (getattr(self, attr) is not None and
not isinstance(getattr(self, attr), pathlib.PurePath)):
setattr(self, attr, pathlib.Path(getattr(self, attr)))
示例22
def get_path(
self,
prefix: Optional[Union[str, PurePath]] = None,
filename: Optional[Union[str, PurePath]] = None,
) -> str:
"""Get the path for this storage location."""
prefix = prefix or self.connector.base_path
path = PurePath(prefix) / self.url
if filename:
path = path / filename
return os.fspath(path)
示例23
def base_path(self) -> PurePath:
"""Get a base path for this connector."""
示例24
def fixture_path():
"""Returns path to the fixture."""
def factory(path: str) -> str:
return str(PurePath(__file__).parent.joinpath('fixtures', path))
return factory
示例25
def fspath(path):
"""
Return the string representation of the path.
If str or bytes is passed in, it is returned unchanged.
This code comes from PEP 519, modified to support earlier versions of
python.
This is required for python < 3.6.
"""
if isinstance(path, (py.builtin.text, py.builtin.bytes)):
return path
# Work from the object's type to match method resolution of other magic
# methods.
path_type = type(path)
try:
return path_type.__fspath__(path)
except AttributeError:
if hasattr(path_type, '__fspath__'):
raise
try:
import pathlib
except ImportError:
pass
else:
if isinstance(path, pathlib.PurePath):
return py.builtin.text(path)
raise TypeError("expected str, bytes or os.PathLike object, not "
+ path_type.__name__)
示例26
def fspath(path):
"""
Return the string representation of the path.
If str or bytes is passed in, it is returned unchanged.
This code comes from PEP 519, modified to support earlier versions of
python.
This is required for python < 3.6.
"""
if isinstance(path, (py.builtin.text, py.builtin.bytes)):
return path
# Work from the object's type to match method resolution of other magic
# methods.
path_type = type(path)
try:
return path_type.__fspath__(path)
except AttributeError:
if hasattr(path_type, '__fspath__'):
raise
try:
import pathlib
except ImportError:
pass
else:
if isinstance(path, pathlib.PurePath):
return py.builtin.text(path)
raise TypeError("expected str, bytes or os.PathLike object, not "
+ path_type.__name__)
示例27
def deduplicate_directories(directory_devices):
'''
Given a map from directory to the identifier for the device on which that directory resides,
return the directories as a sorted tuple with all duplicate child directories removed. For
instance, if paths is ('/foo', '/foo/bar'), return just: ('/foo',)
The one exception to this rule is if two paths are on different filesystems (devices). In that
case, they won't get de-duplicated in case they both need to be passed to Borg (e.g. the
location.one_file_system option is true).
The idea is that if Borg is given a parent directory, then it doesn't also need to be given
child directories, because it will naturally spider the contents of the parent directory. And
there are cases where Borg coming across the same file twice will result in duplicate reads and
even hangs, e.g. when a database hook is using a named pipe for streaming database dumps to
Borg.
'''
deduplicated = set()
directories = sorted(directory_devices.keys())
for directory in directories:
deduplicated.add(directory)
parents = pathlib.PurePath(directory).parents
# If another directory in the given list is a parent of current directory (even n levels
# up) and both are on the same filesystem, then the current directory is a duplicate.
for other_directory in directories:
for parent in parents:
if (
pathlib.PurePath(other_directory) == parent
and directory_devices[other_directory] == directory_devices[directory]
):
if directory in deduplicated:
deduplicated.remove(directory)
break
return tuple(sorted(deduplicated))
示例28
def commit_new_version(version: str):
"""
Commit the file containing the version number variable.
The commit message will be generated from the configured template.
:param version: Version number to be used in the commit message.
"""
from .history import load_version_patterns
commit_subject = config.get("commit_subject")
message = commit_subject.format(version=version)
# Add an extended message if one is configured
commit_message = config.get("commit_message")
if commit_message:
message += "\n\n"
message += commit_message.format(version=version)
commit_author = config.get("commit_author", "semantic-release <semantic-release>",)
for pattern in load_version_patterns():
git_path = PurePath(os.getcwd(), pattern.path).relative_to(repo.working_dir)
repo.git.add(str(git_path))
return repo.git.commit(m=message, author=commit_author)
示例29
def run_pyinstaller(extra_args=None, debug=False):
if extra_args is None:
extra_args = []
app_name = SETTINGS['app_name']
# Would like log level WARN when not debugging. This works fine for
# PyInstaller 3.3. However, for 3.4, it gives confusing warnings
# "hidden import not found". So use ERROR instead.
log_level = 'DEBUG' if debug else 'ERROR'
args = [
'pyinstaller',
'--name', app_name,
'--noupx',
'--log-level', log_level,
'--noconfirm'
]
for hidden_import in SETTINGS['hidden_imports']:
args.extend(['--hidden-import', hidden_import])
args.extend(extra_args)
args.extend([
'--distpath', path('target'),
'--specpath', path('target/PyInstaller'),
'--workpath', path('target/PyInstaller')
])
args.extend(['--additional-hooks-dir', join(dirname(__file__), 'hooks')])
if debug:
args.extend(['--debug', 'all'])
if is_mac():
# Force generation of an .app bundle. Otherwise, PyInstaller skips
# it when --debug is given.
args.append('-w')
hook_path = _generate_runtime_hook()
args.extend(['--runtime-hook', hook_path])
args.append(path(SETTINGS['main_module']))
run(args, check=True)
output_dir = path('target/' + app_name + ('.app' if is_mac() else ''))
freeze_dir = path('${freeze_dir}')
# In most cases, rename(src, dst) silently "works" when src == dst. But on
# some Windows drives, it raises a FileExistsError. So check src != dst:
if PurePath(output_dir) != PurePath(freeze_dir):
rename(output_dir, freeze_dir)
示例30
def save_enriched_motifs(df, fname:str) -> None:
"""
Save enriched motifs.
Supported file formats are CSV, TSV, GMT, DAT (pickle), JSON or YAML.
:param df:
:param fname:
:return:
"""
extension = PurePath(fname).suffixes
if is_valid_suffix(extension, 'ctx'):
df.to_csv(fname, sep=suffixes_to_separator(extension))
else:
regulons = df2regulons(df)
if '.json' in extension:
name2targets = {r.name: list(r.gene2weight.keys()) for r in regulons}
with openfile(fname, 'w') as f:
f.write(json.dumps(name2targets))
elif '.dat' in extension:
with openfile(fname, 'wb') as f:
pickle.dump(regulons, f)
elif '.gmt' in extension:
GeneSignature.to_gmt(fname, regulons)
elif is_valid_suffix(extension, 'ctx_yaml'):
save_to_yaml(regulons, fname)
else:
raise ValueError("Unknown file format \"{}\".".format(fname))