Python源码示例:typing.io.TextIO()

示例1
def patch_file(patch_stream: TextIO, fromcsv_stream: TextIO, tocsv_stream: TextIO,
               strict: bool = True, sep: str = ','):
    """
    Apply the patch to the source CSV file, and save the result to the target
    file.
    """
    diff = patch.load(patch_stream)

    from_records = records.load(fromcsv_stream, sep=sep)
    to_records = patch.apply(diff, from_records, strict=strict)

    # what order should the columns be in?
    if to_records:
        # have data, use a nice ordering
        all_columns = to_records[0].keys()
        index_columns = diff['_index']
        fieldnames = _nice_fieldnames(all_columns, index_columns)
    else:
        # no data, use the original order
        fieldnames = from_records.fieldnames

    records.save(to_records, fieldnames, tocsv_stream) 
示例2
def first_nonblank_line(f: TextIO, max_lines: int = 10) -> str:
    """ return first non-blank 80 character line in file

    Parameters
    ----------

    max_lines: int
        maximum number of blank lines
    """

    line = ""
    for _i in range(max_lines):
        line = f.readline(81)
        if line.strip():
            break

    if _i == max_lines - 1 or not line:
        raise ValueError(f"could not find first valid header line in {f.name}")

    return line 
示例3
def _getsvind(f: TextIO, ln: str) -> List[str]:
    if len(ln) < 32:
        raise ValueError(f'satellite index line truncated:  {ln}')

    Nsv = int(ln[29:32])  # Number of visible satellites this time %i3
    # get first 12 SV ID's
    sv = _getSVlist(ln, min(12, Nsv), [])

    # any more SVs?
    n = Nsv-12
    while n > 0:
        sv = _getSVlist(f.readline(), min(12, n), sv)
        n -= 12

    if Nsv != len(sv):
        raise ValueError('satellite list read incorrectly')

    return sv 
示例4
def obstime2(fn: Union[TextIO, Path],
             verbose: bool = False) -> np.ndarray:
    """
    read all times in RINEX2 OBS file
    """
    times = []
    with opener(fn) as f:
        # Capture header info
        hdr = obsheader2(f)

        for ln in f:
            try:
                time_epoch = _timeobs(ln)
            except ValueError:
                continue

            times.append(time_epoch)

            _skip(f, ln, hdr['Nl_sv'])

    times = np.asarray(times)

    check_unique_times(times)

    return times 
示例5
def opencrx(f: TextIO) -> str:
    """
    Conversion to string is necessary because of a quirk where gzip.open() even with 'rt' doesn't decompress until read.

    Nbytes is used to read first line.
    """
    exe = crxexe()

    if not exe:
        if build() != 0:
            raise RuntimeError('could not build Hatanka converter. Do you have a C compiler?')
        exe = crxexe()
        if not exe:
            raise RuntimeError('Hatanaka converter is broken or missing.')

    ret = subprocess.check_output([exe, '-'],
                                  input=f.read(),
                                  universal_newlines=True)

    return ret 
示例6
def navtime3(fn: Union[TextIO, Path]) -> np.ndarray:
    """
    return all times in RINEX file
    """
    times = []

    with opener(fn) as f:
        navheader3(f)  # skip header

        for line in f:
            try:
                time = _time(line)
            except ValueError:
                continue

            times.append(time)
            _skip(f, Nl[line[0]])  # different system types skip different line counts

    return np.unique(times) 
示例7
def navheader2(f: TextIO) -> Dict[str, Any]:
    """
    For RINEX NAV version 2 only. End users should use rinexheader()
    """
    if isinstance(f, (str, Path)):
        with opener(f, header=True) as h:
            return navheader2(h)

    hdr = rinexinfo(f)

    for ln in f:
        if 'END OF HEADER' in ln:
            break
        kind, content = ln[60:].strip(), ln[:60]
        hdr[kind] = content

    return hdr 
示例8
def navtime2(fn: Union[TextIO, Path]) -> np.ndarray:
    """
    read all times in RINEX 2 NAV file
    """
    times = []
    with opener(fn) as f:
        hdr = navheader2(f)

        while True:
            ln = f.readline()
            if not ln:
                break

            try:
                time = _timenav(ln)
            except ValueError:
                continue

            times.append(time)

            _skip(f, Nl[hdr['systems']])

    return np.unique(times) 
示例9
def obstime3(fn: Union[TextIO, Path],
             verbose: bool = False) -> np.ndarray:
    """
    return all times in RINEX file
    """
    times = []

    with opener(fn) as f:
        for ln in f:
            if ln.startswith('>'):
                times.append(_timeobs(ln))

    times = np.asarray(times)

    check_unique_times(times)

    return times 
示例10
def lines(cls, f: TextIO, reverse: bool = False):
        if not reverse:
            for line in f:
                yield line
        else:
            part = ''
            quoting = False
            for block in cls.reversed_blocks(f):
                for c in reversed(block):
                    if c == '"':
                        quoting = not quoting
                    elif c == '\n' and part and not quoting:
                        yield part[::-1]
                        part = ''
                    part += c
            if part:
                yield part[::-1] 
示例11
def __init__(self, istream: TextIO, sep: str = ',') -> None:
        # bump the built-in limits on field sizes
        csv.field_size_limit(2**24)

        self.reader = csv.DictReader(istream, delimiter=sep) 
示例12
def save(records: Sequence[Record], fieldnames: List[Column], ostream: TextIO):
    writer = csv.DictWriter(ostream, fieldnames)
    writer.writeheader()
    for r in records:
        writer.writerow(r) 
示例13
def __init__(self, input=None, output:TextIO = sys.stdout):
        super().__init__(input, output)
        self.checkVersion("4.7.1")
        self._interp = LexerATNSimulator(self, self.atn, self.decisionsToDFA, PredictionContextCache())
        self._actions = None
        self._predicates = None 
示例14
def __init__(self, input:TokenStream, output:TextIO = sys.stdout):
        super().__init__()
        # The input stream.
        self._input = None
        self._output = output
        # The error handling strategy for the parser. The default value is a new
        # instance of {@link DefaultErrorStrategy}.
        self._errHandler = DefaultErrorStrategy()
        self._precedenceStack = list()
        self._precedenceStack.append(0)
        # The {@link ParserRuleContext} object for the currently executing rule.
        # self is always non-null during the parsing process.
        self._ctx = None
        # Specifies whether or not the parser should construct a parse tree during
        # the parsing process. The default value is {@code true}.
        self.buildParseTrees = True
        # When {@link #setTrace}{@code (true)} is called, a reference to the
        # {@link TraceListener} is stored here so it can be easily removed in a
        # later call to {@link #setTrace}{@code (false)}. The listener itself is
        # implemented as a parser listener so self field is not directly used by
        # other parser methods.
        self._tracer = None
        # The list of {@link ParseTreeListener} listeners registered to receive
        # events during the parse.
        self._parseListeners = None
        # The number of syntax errors reported during parsing. self value is
        # incremented each time {@link #notifyErrorListeners} is called.
        self._syntaxErrors = 0
        self.setInputStream(input)

    # reset the parser's state# 
示例15
def __init__(self, input=None, output:TextIO = sys.stdout):
        super().__init__(input, output)
        self.checkVersion("4.8")
        self._interp = LexerATNSimulator(self, self.atn, self.decisionsToDFA, PredictionContextCache())
        self._actions = None
        self._predicates = None 
示例16
def __init__(self, input:TokenStream, output:TextIO = sys.stdout):
        super().__init__(input, output)
        self.checkVersion("4.8")
        self._interp = ParserATNSimulator(self, self.atn, self.decisionsToDFA, self.sharedContextCache)
        self._predicates = None 
示例17
def open_text(package: Package,
              resource: Resource,
              encoding: str = 'utf-8',
              errors: str = 'strict') -> TextIO:
    """Return a file-like object opened for text reading of the resource."""
    return TextIOWrapper(
        open_binary(package, resource), encoding=encoding, errors=errors) 
示例18
def open_text(package: Package,
              resource: Resource,
              encoding: str = 'utf-8',
              errors: str = 'strict') -> TextIO:
    """Return a file-like object opened for text reading of the resource."""
    resource = _normalize_path(resource)
    package = _get_package(package)
    reader = _get_resource_reader(package)
    if reader is not None:
        return TextIOWrapper(reader.open_resource(resource), encoding, errors)
    _check_location(package)
    absolute_package_path = os.path.abspath(package.__spec__.origin)
    package_path = os.path.dirname(absolute_package_path)
    full_path = os.path.join(package_path, resource)
    try:
        return open(full_path, mode='r', encoding=encoding, errors=errors)
    except OSError:
        # Just assume the loader is a resource loader; all the relevant
        # importlib.machinery loaders are and an AttributeError for
        # get_data() will make it clear what is needed from the loader.
        loader = cast(ResourceLoader, package.__spec__.loader)
        data = None
        if hasattr(package.__spec__.loader, 'get_data'):
            with suppress(OSError):
                data = loader.get_data(full_path)
        if data is None:
            package_name = package.__spec__.name
            message = '{!r} resource not found in {!r}'.format(
                resource, package_name)
            raise FileNotFoundError(message)
        else:
            return TextIOWrapper(BytesIO(data), encoding, errors) 
示例19
def rinexheader(fn: Union[TextIO, str, Path]) -> Dict[str, Any]:
    """
    retrieve RINEX 2/3 or CRINEX 1/3 header as unparsed dict()
    """
    if isinstance(fn, (str, Path)):
        fn = Path(fn).expanduser()

    if isinstance(fn, Path) and fn.suffix == '.nc':
        return rinexinfo(fn)
    elif isinstance(fn, Path):
        with opener(fn, header=True) as f:
            return rinexheader(f)
    elif isinstance(fn, io.StringIO):
        fn.seek(0)
    elif isinstance(fn, io.TextIOWrapper):
        pass
    else:
        raise TypeError(f'unknown RINEX filetype {type(fn)}')

    info = rinexinfo(fn)

    if int(info['version']) in (1, 2):
        if info['rinextype'] == 'obs':
            hdr = obsheader2(fn)
        elif info['rinextype'] == 'nav':
            hdr = navheader2(fn)
        else:
            raise ValueError(f'Unknown rinex type {info} in {fn}')
    elif int(info['version']) == 3:
        if info['rinextype'] == 'obs':
            hdr = obsheader3(fn)
        elif info['rinextype'] == 'nav':
            hdr = navheader3(fn)
        else:
            raise ValueError(f'Unknown rinex type {info} in {fn}')
    else:
        raise ValueError(f'unknown RINEX {info}  {fn}')

    return hdr 
示例20
def _skip(f: TextIO, ln: str,
          Nl_sv: int,
          sv: Sequence[str] = None):
    """
    skip ahead to next time step
    """
    if sv is None:
        sv = _getsvind(f, ln)

    # f.seek(len(sv)*Nl_sv*80, 1)  # not for io.TextIOWrapper ?
    for _ in range(len(sv)*Nl_sv):
        f.readline() 
示例21
def _skip_header(f: TextIO):
    for ln in f:
        if "END OF HEADER" in ln:
            break 
示例22
def _skip(f: TextIO, Nl: int):
    for _, _ in zip(range(Nl), f):
        pass 
示例23
def navheader3(f: TextIO) -> Dict[str, Any]:

    if isinstance(f, (str, Path)):
        with opener(f, header=True) as h:
            return navheader3(h)

    hdr = rinexinfo(f)

    for ln in f:
        if 'END OF HEADER' in ln:
            break

        kind, content = ln[60:].strip(), ln[:60]
        if kind == "IONOSPHERIC CORR":
            if kind not in hdr:
                hdr[kind] = {}

            coeff_kind = content[:4].strip()
            N = 3 if coeff_kind == 'GAL' else 4
            # RINEX 3.04 table A5 page A19
            coeff = [rinex_string_to_float(content[5 + i*12:5 + (i+1)*12]) for i in range(N)]
            hdr[kind][coeff_kind] = coeff
        else:
            hdr[kind] = content

    return hdr 
示例24
def __init__(self, input:TokenStream, output:TextIO = sys.stdout):
        super().__init__()
        # The input stream.
        self._input = None
        self._output = output
        # The error handling strategy for the parser. The default value is a new
        # instance of {@link DefaultErrorStrategy}.
        self._errHandler = DefaultErrorStrategy()
        self._precedenceStack = list()
        self._precedenceStack.append(0)
        # The {@link ParserRuleContext} object for the currently executing rule.
        # self is always non-null during the parsing process.
        self._ctx = None
        # Specifies whether or not the parser should construct a parse tree during
        # the parsing process. The default value is {@code true}.
        self.buildParseTrees = True
        # When {@link #setTrace}{@code (true)} is called, a reference to the
        # {@link TraceListener} is stored here so it can be easily removed in a
        # later call to {@link #setTrace}{@code (false)}. The listener itself is
        # implemented as a parser listener so self field is not directly used by
        # other parser methods.
        self._tracer = None
        # The list of {@link ParseTreeListener} listeners registered to receive
        # events during the parse.
        self._parseListeners = None
        # The number of syntax errors reported during parsing. self value is
        # incremented each time {@link #notifyErrorListeners} is called.
        self._syntaxErrors = 0
        self.setInputStream(input)

    # reset the parser's state# 
示例25
def __init__(self, input=None, output:TextIO = sys.stdout):
        super().__init__(input, output)
        self.checkVersion("4.8")
        self._interp = LexerATNSimulator(self, self.atn, self.decisionsToDFA, PredictionContextCache())
        self._actions = None
        self._predicates = None 
示例26
def __init__(self, input:TokenStream, output:TextIO = sys.stdout):
        super().__init__(input, output)
        self.checkVersion("4.8")
        self._interp = ParserATNSimulator(self, self.atn, self.decisionsToDFA, self.sharedContextCache)
        self._predicates = None 
示例27
def test_textio(self):

        def stuff(a: TextIO) -> str:
            return a.readline()

        a = stuff.__annotations__['a']
        assert a.__parameters__ == (str,) 
示例28
def test_io_submodule(self):
        from typing.io import IO, TextIO, BinaryIO, __all__, __name__
        assert IO is typing.IO
        assert TextIO is typing.TextIO
        assert BinaryIO is typing.BinaryIO
        assert set(__all__) == set(['IO', 'TextIO', 'BinaryIO'])
        assert __name__ == 'typing.io' 
示例29
def open_text(package: Package,
              resource: Resource,
              encoding: str = 'utf-8',
              errors: str = 'strict') -> TextIO:
    """Return a file-like object opened for text reading of the resource."""
    resource = _normalize_path(resource)
    package = _get_package(package)
    reader = _get_resource_reader(package)
    if reader is not None:
        return TextIOWrapper(reader.open_resource(resource), encoding, errors)
    _check_location(package)
    absolute_package_path = os.path.abspath(package.__spec__.origin)
    package_path = os.path.dirname(absolute_package_path)
    full_path = os.path.join(package_path, resource)
    try:
        return open(full_path, mode='r', encoding=encoding, errors=errors)
    except OSError:
        # Just assume the loader is a resource loader; all the relevant
        # importlib.machinery loaders are and an AttributeError for
        # get_data() will make it clear what is needed from the loader.
        loader = cast(ResourceLoader, package.__spec__.loader)
        data = None
        if hasattr(package.__spec__.loader, 'get_data'):
            with suppress(OSError):
                data = loader.get_data(full_path)
        if data is None:
            package_name = package.__spec__.name
            message = '{!r} resource not found in {!r}'.format(
                resource, package_name)
            raise FileNotFoundError(message)
        else:
            return TextIOWrapper(BytesIO(data), encoding, errors) 
示例30
def reversed_blocks(f: TextIO, blocksize=4096):
        """ Generate blocks of file's contents in reverse order. """
        f.seek(0, os.SEEK_END)
        here = f.tell()
        while 0 < here:
            delta = min(blocksize, here)
            here -= delta
            f.seek(here, os.SEEK_SET)
            yield f.read(delta)