Python源码示例:typing.io.TextIO()
示例1
def patch_file(patch_stream: TextIO, fromcsv_stream: TextIO, tocsv_stream: TextIO,
strict: bool = True, sep: str = ','):
"""
Apply the patch to the source CSV file, and save the result to the target
file.
"""
diff = patch.load(patch_stream)
from_records = records.load(fromcsv_stream, sep=sep)
to_records = patch.apply(diff, from_records, strict=strict)
# what order should the columns be in?
if to_records:
# have data, use a nice ordering
all_columns = to_records[0].keys()
index_columns = diff['_index']
fieldnames = _nice_fieldnames(all_columns, index_columns)
else:
# no data, use the original order
fieldnames = from_records.fieldnames
records.save(to_records, fieldnames, tocsv_stream)
示例2
def first_nonblank_line(f: TextIO, max_lines: int = 10) -> str:
""" return first non-blank 80 character line in file
Parameters
----------
max_lines: int
maximum number of blank lines
"""
line = ""
for _i in range(max_lines):
line = f.readline(81)
if line.strip():
break
if _i == max_lines - 1 or not line:
raise ValueError(f"could not find first valid header line in {f.name}")
return line
示例3
def _getsvind(f: TextIO, ln: str) -> List[str]:
if len(ln) < 32:
raise ValueError(f'satellite index line truncated: {ln}')
Nsv = int(ln[29:32]) # Number of visible satellites this time %i3
# get first 12 SV ID's
sv = _getSVlist(ln, min(12, Nsv), [])
# any more SVs?
n = Nsv-12
while n > 0:
sv = _getSVlist(f.readline(), min(12, n), sv)
n -= 12
if Nsv != len(sv):
raise ValueError('satellite list read incorrectly')
return sv
示例4
def obstime2(fn: Union[TextIO, Path],
verbose: bool = False) -> np.ndarray:
"""
read all times in RINEX2 OBS file
"""
times = []
with opener(fn) as f:
# Capture header info
hdr = obsheader2(f)
for ln in f:
try:
time_epoch = _timeobs(ln)
except ValueError:
continue
times.append(time_epoch)
_skip(f, ln, hdr['Nl_sv'])
times = np.asarray(times)
check_unique_times(times)
return times
示例5
def opencrx(f: TextIO) -> str:
"""
Conversion to string is necessary because of a quirk where gzip.open() even with 'rt' doesn't decompress until read.
Nbytes is used to read first line.
"""
exe = crxexe()
if not exe:
if build() != 0:
raise RuntimeError('could not build Hatanka converter. Do you have a C compiler?')
exe = crxexe()
if not exe:
raise RuntimeError('Hatanaka converter is broken or missing.')
ret = subprocess.check_output([exe, '-'],
input=f.read(),
universal_newlines=True)
return ret
示例6
def navtime3(fn: Union[TextIO, Path]) -> np.ndarray:
"""
return all times in RINEX file
"""
times = []
with opener(fn) as f:
navheader3(f) # skip header
for line in f:
try:
time = _time(line)
except ValueError:
continue
times.append(time)
_skip(f, Nl[line[0]]) # different system types skip different line counts
return np.unique(times)
示例7
def navheader2(f: TextIO) -> Dict[str, Any]:
"""
For RINEX NAV version 2 only. End users should use rinexheader()
"""
if isinstance(f, (str, Path)):
with opener(f, header=True) as h:
return navheader2(h)
hdr = rinexinfo(f)
for ln in f:
if 'END OF HEADER' in ln:
break
kind, content = ln[60:].strip(), ln[:60]
hdr[kind] = content
return hdr
示例8
def navtime2(fn: Union[TextIO, Path]) -> np.ndarray:
"""
read all times in RINEX 2 NAV file
"""
times = []
with opener(fn) as f:
hdr = navheader2(f)
while True:
ln = f.readline()
if not ln:
break
try:
time = _timenav(ln)
except ValueError:
continue
times.append(time)
_skip(f, Nl[hdr['systems']])
return np.unique(times)
示例9
def obstime3(fn: Union[TextIO, Path],
verbose: bool = False) -> np.ndarray:
"""
return all times in RINEX file
"""
times = []
with opener(fn) as f:
for ln in f:
if ln.startswith('>'):
times.append(_timeobs(ln))
times = np.asarray(times)
check_unique_times(times)
return times
示例10
def lines(cls, f: TextIO, reverse: bool = False):
if not reverse:
for line in f:
yield line
else:
part = ''
quoting = False
for block in cls.reversed_blocks(f):
for c in reversed(block):
if c == '"':
quoting = not quoting
elif c == '\n' and part and not quoting:
yield part[::-1]
part = ''
part += c
if part:
yield part[::-1]
示例11
def __init__(self, istream: TextIO, sep: str = ',') -> None:
# bump the built-in limits on field sizes
csv.field_size_limit(2**24)
self.reader = csv.DictReader(istream, delimiter=sep)
示例12
def save(records: Sequence[Record], fieldnames: List[Column], ostream: TextIO):
writer = csv.DictWriter(ostream, fieldnames)
writer.writeheader()
for r in records:
writer.writerow(r)
示例13
def __init__(self, input=None, output:TextIO = sys.stdout):
super().__init__(input, output)
self.checkVersion("4.7.1")
self._interp = LexerATNSimulator(self, self.atn, self.decisionsToDFA, PredictionContextCache())
self._actions = None
self._predicates = None
示例14
def __init__(self, input:TokenStream, output:TextIO = sys.stdout):
super().__init__()
# The input stream.
self._input = None
self._output = output
# The error handling strategy for the parser. The default value is a new
# instance of {@link DefaultErrorStrategy}.
self._errHandler = DefaultErrorStrategy()
self._precedenceStack = list()
self._precedenceStack.append(0)
# The {@link ParserRuleContext} object for the currently executing rule.
# self is always non-null during the parsing process.
self._ctx = None
# Specifies whether or not the parser should construct a parse tree during
# the parsing process. The default value is {@code true}.
self.buildParseTrees = True
# When {@link #setTrace}{@code (true)} is called, a reference to the
# {@link TraceListener} is stored here so it can be easily removed in a
# later call to {@link #setTrace}{@code (false)}. The listener itself is
# implemented as a parser listener so self field is not directly used by
# other parser methods.
self._tracer = None
# The list of {@link ParseTreeListener} listeners registered to receive
# events during the parse.
self._parseListeners = None
# The number of syntax errors reported during parsing. self value is
# incremented each time {@link #notifyErrorListeners} is called.
self._syntaxErrors = 0
self.setInputStream(input)
# reset the parser's state#
示例15
def __init__(self, input=None, output:TextIO = sys.stdout):
super().__init__(input, output)
self.checkVersion("4.8")
self._interp = LexerATNSimulator(self, self.atn, self.decisionsToDFA, PredictionContextCache())
self._actions = None
self._predicates = None
示例16
def __init__(self, input:TokenStream, output:TextIO = sys.stdout):
super().__init__(input, output)
self.checkVersion("4.8")
self._interp = ParserATNSimulator(self, self.atn, self.decisionsToDFA, self.sharedContextCache)
self._predicates = None
示例17
def open_text(package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict') -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
return TextIOWrapper(
open_binary(package, resource), encoding=encoding, errors=errors)
示例18
def open_text(package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict') -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
resource = _normalize_path(resource)
package = _get_package(package)
reader = _get_resource_reader(package)
if reader is not None:
return TextIOWrapper(reader.open_resource(resource), encoding, errors)
_check_location(package)
absolute_package_path = os.path.abspath(package.__spec__.origin)
package_path = os.path.dirname(absolute_package_path)
full_path = os.path.join(package_path, resource)
try:
return open(full_path, mode='r', encoding=encoding, errors=errors)
except OSError:
# Just assume the loader is a resource loader; all the relevant
# importlib.machinery loaders are and an AttributeError for
# get_data() will make it clear what is needed from the loader.
loader = cast(ResourceLoader, package.__spec__.loader)
data = None
if hasattr(package.__spec__.loader, 'get_data'):
with suppress(OSError):
data = loader.get_data(full_path)
if data is None:
package_name = package.__spec__.name
message = '{!r} resource not found in {!r}'.format(
resource, package_name)
raise FileNotFoundError(message)
else:
return TextIOWrapper(BytesIO(data), encoding, errors)
示例19
def rinexheader(fn: Union[TextIO, str, Path]) -> Dict[str, Any]:
"""
retrieve RINEX 2/3 or CRINEX 1/3 header as unparsed dict()
"""
if isinstance(fn, (str, Path)):
fn = Path(fn).expanduser()
if isinstance(fn, Path) and fn.suffix == '.nc':
return rinexinfo(fn)
elif isinstance(fn, Path):
with opener(fn, header=True) as f:
return rinexheader(f)
elif isinstance(fn, io.StringIO):
fn.seek(0)
elif isinstance(fn, io.TextIOWrapper):
pass
else:
raise TypeError(f'unknown RINEX filetype {type(fn)}')
info = rinexinfo(fn)
if int(info['version']) in (1, 2):
if info['rinextype'] == 'obs':
hdr = obsheader2(fn)
elif info['rinextype'] == 'nav':
hdr = navheader2(fn)
else:
raise ValueError(f'Unknown rinex type {info} in {fn}')
elif int(info['version']) == 3:
if info['rinextype'] == 'obs':
hdr = obsheader3(fn)
elif info['rinextype'] == 'nav':
hdr = navheader3(fn)
else:
raise ValueError(f'Unknown rinex type {info} in {fn}')
else:
raise ValueError(f'unknown RINEX {info} {fn}')
return hdr
示例20
def _skip(f: TextIO, ln: str,
Nl_sv: int,
sv: Sequence[str] = None):
"""
skip ahead to next time step
"""
if sv is None:
sv = _getsvind(f, ln)
# f.seek(len(sv)*Nl_sv*80, 1) # not for io.TextIOWrapper ?
for _ in range(len(sv)*Nl_sv):
f.readline()
示例21
def _skip_header(f: TextIO):
for ln in f:
if "END OF HEADER" in ln:
break
示例22
def _skip(f: TextIO, Nl: int):
for _, _ in zip(range(Nl), f):
pass
示例23
def navheader3(f: TextIO) -> Dict[str, Any]:
if isinstance(f, (str, Path)):
with opener(f, header=True) as h:
return navheader3(h)
hdr = rinexinfo(f)
for ln in f:
if 'END OF HEADER' in ln:
break
kind, content = ln[60:].strip(), ln[:60]
if kind == "IONOSPHERIC CORR":
if kind not in hdr:
hdr[kind] = {}
coeff_kind = content[:4].strip()
N = 3 if coeff_kind == 'GAL' else 4
# RINEX 3.04 table A5 page A19
coeff = [rinex_string_to_float(content[5 + i*12:5 + (i+1)*12]) for i in range(N)]
hdr[kind][coeff_kind] = coeff
else:
hdr[kind] = content
return hdr
示例24
def __init__(self, input:TokenStream, output:TextIO = sys.stdout):
super().__init__()
# The input stream.
self._input = None
self._output = output
# The error handling strategy for the parser. The default value is a new
# instance of {@link DefaultErrorStrategy}.
self._errHandler = DefaultErrorStrategy()
self._precedenceStack = list()
self._precedenceStack.append(0)
# The {@link ParserRuleContext} object for the currently executing rule.
# self is always non-null during the parsing process.
self._ctx = None
# Specifies whether or not the parser should construct a parse tree during
# the parsing process. The default value is {@code true}.
self.buildParseTrees = True
# When {@link #setTrace}{@code (true)} is called, a reference to the
# {@link TraceListener} is stored here so it can be easily removed in a
# later call to {@link #setTrace}{@code (false)}. The listener itself is
# implemented as a parser listener so self field is not directly used by
# other parser methods.
self._tracer = None
# The list of {@link ParseTreeListener} listeners registered to receive
# events during the parse.
self._parseListeners = None
# The number of syntax errors reported during parsing. self value is
# incremented each time {@link #notifyErrorListeners} is called.
self._syntaxErrors = 0
self.setInputStream(input)
# reset the parser's state#
示例25
def __init__(self, input=None, output:TextIO = sys.stdout):
super().__init__(input, output)
self.checkVersion("4.8")
self._interp = LexerATNSimulator(self, self.atn, self.decisionsToDFA, PredictionContextCache())
self._actions = None
self._predicates = None
示例26
def __init__(self, input:TokenStream, output:TextIO = sys.stdout):
super().__init__(input, output)
self.checkVersion("4.8")
self._interp = ParserATNSimulator(self, self.atn, self.decisionsToDFA, self.sharedContextCache)
self._predicates = None
示例27
def test_textio(self):
def stuff(a: TextIO) -> str:
return a.readline()
a = stuff.__annotations__['a']
assert a.__parameters__ == (str,)
示例28
def test_io_submodule(self):
from typing.io import IO, TextIO, BinaryIO, __all__, __name__
assert IO is typing.IO
assert TextIO is typing.TextIO
assert BinaryIO is typing.BinaryIO
assert set(__all__) == set(['IO', 'TextIO', 'BinaryIO'])
assert __name__ == 'typing.io'
示例29
def open_text(package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict') -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
resource = _normalize_path(resource)
package = _get_package(package)
reader = _get_resource_reader(package)
if reader is not None:
return TextIOWrapper(reader.open_resource(resource), encoding, errors)
_check_location(package)
absolute_package_path = os.path.abspath(package.__spec__.origin)
package_path = os.path.dirname(absolute_package_path)
full_path = os.path.join(package_path, resource)
try:
return open(full_path, mode='r', encoding=encoding, errors=errors)
except OSError:
# Just assume the loader is a resource loader; all the relevant
# importlib.machinery loaders are and an AttributeError for
# get_data() will make it clear what is needed from the loader.
loader = cast(ResourceLoader, package.__spec__.loader)
data = None
if hasattr(package.__spec__.loader, 'get_data'):
with suppress(OSError):
data = loader.get_data(full_path)
if data is None:
package_name = package.__spec__.name
message = '{!r} resource not found in {!r}'.format(
resource, package_name)
raise FileNotFoundError(message)
else:
return TextIOWrapper(BytesIO(data), encoding, errors)
示例30
def reversed_blocks(f: TextIO, blocksize=4096):
""" Generate blocks of file's contents in reverse order. """
f.seek(0, os.SEEK_END)
here = f.tell()
while 0 < here:
delta = min(blocksize, here)
here -= delta
f.seek(here, os.SEEK_SET)
yield f.read(delta)