Python源码示例:scipy.io.netcdf.netcdf_file()

示例1
def test_ticket_1720():
    io = BytesIO()

    items = [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9]

    with netcdf_file(io, 'w') as f:
        f.history = 'Created for a test'
        f.createDimension('float_var', 10)
        float_var = f.createVariable('float_var', 'f', ('float_var',))
        float_var[:] = items
        float_var.units = 'metres'
        f.flush()
        contents = io.getvalue()

    io = BytesIO(contents)
    with netcdf_file(io, 'r') as f:
        assert_equal(f.history, b'Created for a test')
        float_var = f.variables['float_var']
        assert_equal(float_var.units, b'metres')
        assert_equal(float_var.shape, (10,))
        assert_allclose(float_var[:], items) 
示例2
def test_ticket_1720():
    io = BytesIO()

    items = [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9]

    with netcdf_file(io, 'w') as f:
        f.history = 'Created for a test'
        f.createDimension('float_var', 10)
        float_var = f.createVariable('float_var', 'f', ('float_var',))
        float_var[:] = items
        float_var.units = 'metres'
        f.flush()
        contents = io.getvalue()

    io = BytesIO(contents)
    with netcdf_file(io, 'r') as f:
        assert_equal(f.history, b'Created for a test')
        float_var = f.variables['float_var']
        assert_equal(float_var.units, b'metres')
        assert_equal(float_var.shape, (10,))
        assert_allclose(float_var[:], items) 
示例3
def test_mmaps_segfault():
    filename = pjoin(TEST_DATA_PATH, 'example_1.nc')

    if not IS_PYPY:
        with warnings.catch_warnings():
            warnings.simplefilter("error")
            with netcdf_file(filename, mmap=True) as f:
                x = f.variables['lat'][:]
                # should not raise warnings
                del x

    def doit():
        with netcdf_file(filename, mmap=True) as f:
            return f.variables['lat'][:]

    # should not crash
    with suppress_warnings() as sup:
        sup.filter(RuntimeWarning,
                   "Cannot close a netcdf_file opened with mmap=True, when netcdf_variables or arrays referring to its data still exist")
        x = doit()
    x.sum() 
示例4
def test_netcdf_write_as_netcdf3_64bit(tmpdir):
    """Test write_netcdf with output format 64-bit netcdf3."""
    from scipy.io import netcdf

    field = RasterModelGrid((4, 3))
    field.add_field("topographic__elevation", np.arange(12.0), at="node")
    field.add_field("uplift_rate", 2.0 * np.arange(12.0), at="node")

    with tmpdir.as_cwd():
        write_netcdf("test.nc", field, format="NETCDF3_64BIT")

        f = netcdf.netcdf_file("test.nc", "r")

        for name in ["topographic__elevation", "uplift_rate"]:
            assert name in f.variables
            assert_array_equal(f.variables[name][:].flatten(), field.at_node[name])

        f.close() 
示例5
def test_netcdf_write_as_netcdf3_classic(tmpdir):
    """Test write_netcdf with output format classic netcdf3."""
    from scipy.io import netcdf

    field = RasterModelGrid((4, 3))
    field.add_field("topographic__elevation", np.arange(12.0), at="node")
    field.add_field("uplift_rate", 2.0 * np.arange(12.0), at="node")

    with tmpdir.as_cwd():
        write_netcdf("test.nc", field, format="NETCDF3_CLASSIC")

        f = netcdf.netcdf_file("test.nc", "r")

        for name in ["topographic__elevation", "uplift_rate"]:
            assert name in f.variables
            assert_array_equal(f.variables[name][:].flatten(), field.at_node[name])

        f.close() 
示例6
def make_simple(*args, **kwargs):
    f = netcdf_file(*args, **kwargs)
    f.history = 'Created for a test'
    f.createDimension('time', N_EG_ELS)
    time = f.createVariable('time', VARTYPE_EG, ('time',))
    time[:] = np.arange(N_EG_ELS)
    time.units = 'days since 2008-01-01'
    f.flush()
    yield f
    f.close() 
示例7
def test_read_write_files():
    # test round trip for example file
    cwd = os.getcwd()
    try:
        tmpdir = tempfile.mkdtemp()
        os.chdir(tmpdir)
        with make_simple('simple.nc', 'w') as f:
            pass
        # To read the NetCDF file we just created::
        with netcdf_file('simple.nc') as f:
            # Using mmap is the default
            yield assert_true, f.use_mmap
            for testargs in gen_for_simple(f):
                yield testargs

        # Now without mmap
        with netcdf_file('simple.nc', mmap=False) as f:
            # Using mmap is the default
            yield assert_false, f.use_mmap
            for testargs in gen_for_simple(f):
                yield testargs

        # To read the NetCDF file we just created, as file object, no
        # mmap.  When n * n_bytes(var_type) is not divisible by 4, this
        # raised an error in pupynere 1.0.12 and scipy rev 5893, because
        # calculated vsize was rounding up in units of 4 - see
        # http://www.unidata.ucar.edu/software/netcdf/docs/netcdf.html
        fobj = open('simple.nc', 'rb')
        with netcdf_file(fobj) as f:
            # by default, don't use mmap for file-like
            yield assert_false, f.use_mmap
            for testargs in gen_for_simple(f):
                yield testargs
    except:
        os.chdir(cwd)
        shutil.rmtree(tmpdir)
        raise
    os.chdir(cwd)
    shutil.rmtree(tmpdir) 
示例8
def test_read_write_sio():
    eg_sio1 = BytesIO()
    with make_simple(eg_sio1, 'w') as f1:
        str_val = eg_sio1.getvalue()

    eg_sio2 = BytesIO(str_val)
    with netcdf_file(eg_sio2) as f2:
        for testargs in gen_for_simple(f2):
            yield testargs

    # Test that error is raised if attempting mmap for sio
    eg_sio3 = BytesIO(str_val)
    yield assert_raises, ValueError, netcdf_file, eg_sio3, 'r', True
    # Test 64-bit offset write / read
    eg_sio_64 = BytesIO()
    with make_simple(eg_sio_64, 'w', version=2) as f_64:
        str_val = eg_sio_64.getvalue()

    eg_sio_64 = BytesIO(str_val)
    with netcdf_file(eg_sio_64) as f_64:
        for testargs in gen_for_simple(f_64):
            yield testargs
        yield assert_equal, f_64.version_byte, 2
    # also when version 2 explicitly specified
    eg_sio_64 = BytesIO(str_val)
    with netcdf_file(eg_sio_64, version=2) as f_64:
        for testargs in gen_for_simple(f_64):
            yield testargs
        yield assert_equal, f_64.version_byte, 2 
示例9
def test_read_example_data():
    # read any example data files
    for fname in glob(pjoin(TEST_DATA_PATH, '*.nc')):
        with netcdf_file(fname, 'r') as f:
            pass
        with netcdf_file(fname, 'r', mmap=False) as f:
            pass 
示例10
def test_itemset_no_segfault_on_readonly():
    # Regression test for ticket #1202.
    # Open the test file in read-only mode.
    filename = pjoin(TEST_DATA_PATH, 'example_1.nc')
    with netcdf_file(filename, 'r') as f:
        time_var = f.variables['time']

    # time_var.assignValue(42) should raise a RuntimeError--not seg. fault!
    assert_raises(RuntimeError, time_var.assignValue, 42) 
示例11
def read(self, path, **kwargs):
        from scipy.io import netcdf
        path = op.abspath(path)
        c = self.container

        grd = netcdf.netcdf_file(self._getDisplacementFile(path),
                                 mode='r', version=2)
        displ = grd.variables['z'][:].copy()
        c.displacement = displ
        shape = c.displacement.shape
        # LatLon
        c.frame.spacing = 'degree'
        c.frame.llLat = grd.variables['lat'][:].min()
        c.frame.llLon = grd.variables['lon'][:].min()

        c.frame.dN = (grd.variables['lat'][:].max() -
                      c.frame.llLat) / shape[0]
        c.frame.dE = (grd.variables['lon'][:].max() -
                      c.frame.llLon) / shape[1]

        # Theta and Phi
        try:
            los = num.memmap(self._getLOSFile(path), dtype='<f4')
            e = los[3::6].copy().reshape(shape)
            n = los[4::6].copy().reshape(shape)
            u = los[5::6].copy().reshape(shape)

            phi = num.arctan(n/e)
            theta = num.arcsin(u)
            # phi[n < 0] += num.pi

            c.phi = phi
            c.theta = theta
        except ImportError:
            self._log.warning(self.__doc__)
            self._log.warning('Defaulting theta to pi/2 and phi to 0.')
            c.theta = num.pi/2
            c.phi = 0.
        return c 
示例12
def make_simple(*args, **kwargs):
    f = netcdf_file(*args, **kwargs)
    f.history = 'Created for a test'
    f.createDimension('time', N_EG_ELS)
    time = f.createVariable('time', VARTYPE_EG, ('time',))
    time[:] = np.arange(N_EG_ELS)
    time.units = 'days since 2008-01-01'
    f.flush()
    yield f
    f.close() 
示例13
def test_read_write_sio():
    eg_sio1 = BytesIO()
    with make_simple(eg_sio1, 'w') as f1:
        str_val = eg_sio1.getvalue()

    eg_sio2 = BytesIO(str_val)
    with netcdf_file(eg_sio2) as f2:
        check_simple(f2)

    # Test that error is raised if attempting mmap for sio
    eg_sio3 = BytesIO(str_val)
    assert_raises(ValueError, netcdf_file, eg_sio3, 'r', True)
    # Test 64-bit offset write / read
    eg_sio_64 = BytesIO()
    with make_simple(eg_sio_64, 'w', version=2) as f_64:
        str_val = eg_sio_64.getvalue()

    eg_sio_64 = BytesIO(str_val)
    with netcdf_file(eg_sio_64) as f_64:
        check_simple(f_64)
        assert_equal(f_64.version_byte, 2)
    # also when version 2 explicitly specified
    eg_sio_64 = BytesIO(str_val)
    with netcdf_file(eg_sio_64, version=2) as f_64:
        check_simple(f_64)
        assert_equal(f_64.version_byte, 2) 
示例14
def test_encoded_fill_value():
    with netcdf_file(BytesIO(), mode='w') as f:
        f.createDimension('x', 1)
        var = f.createVariable('var', 'S1', ('x',))
        assert_equal(var._get_encoded_fill_value(), b'\x00')
        var._FillValue = b'\x01'
        assert_equal(var._get_encoded_fill_value(), b'\x01')
        var._FillValue = b'\x00\x00'  # invalid, wrong size
        assert_equal(var._get_encoded_fill_value(), b'\x00') 
示例15
def test_read_example_data():
    # read any example data files
    for fname in glob(pjoin(TEST_DATA_PATH, '*.nc')):
        with netcdf_file(fname, 'r') as f:
            pass
        with netcdf_file(fname, 'r', mmap=False) as f:
            pass 
示例16
def test_appending_issue_gh_8625():
    stream = BytesIO()

    with make_simple(stream, mode='w') as f:
        f.createDimension('x', 2)
        f.createVariable('x', float, ('x',))
        f.variables['x'][...] = 1
        f.flush()
        contents = stream.getvalue()

    stream = BytesIO(contents)
    with netcdf_file(stream, mode='a') as f:
        f.variables['x'][...] = 2 
示例17
def test_write_invalid_dtype():
    dtypes = ['int64', 'uint64']
    if np.dtype('int').itemsize == 8:   # 64-bit machines
        dtypes.append('int')
    if np.dtype('uint').itemsize == 8:   # 64-bit machines
        dtypes.append('uint')

    with netcdf_file(BytesIO(), 'w') as f:
        f.createDimension('time', N_EG_ELS)
        for dt in dtypes:
            assert_raises(ValueError, f.createVariable, 'time', dt, ('time',)) 
示例18
def test_byte_gatts():
    # Check that global "string" atts work like they did before py3k
    # unicode and general bytes confusion
    with in_tempdir():
        filename = 'g_byte_atts.nc'
        f = netcdf_file(filename, 'w')
        f._attributes['holy'] = b'grail'
        f._attributes['witch'] = 'floats'
        f.close()
        f = netcdf_file(filename, 'r')
        assert_equal(f._attributes['holy'], b'grail')
        assert_equal(f._attributes['witch'], b'floats')
        f.close() 
示例19
def test_append_recordDimension():
    dataSize = 100

    with in_tempdir():
        # Create file with record time dimension
        with netcdf_file('withRecordDimension.nc', 'w') as f:
            f.createDimension('time', None)
            f.createVariable('time', 'd', ('time',))
            f.createDimension('x', dataSize)
            x = f.createVariable('x', 'd', ('x',))
            x[:] = np.array(range(dataSize))
            f.createDimension('y', dataSize)
            y = f.createVariable('y', 'd', ('y',))
            y[:] = np.array(range(dataSize))
            f.createVariable('testData', 'i', ('time', 'x', 'y'))
            f.flush()
            f.close()

        for i in range(2):
            # Open the file in append mode and add data
            with netcdf_file('withRecordDimension.nc', 'a') as f:
                f.variables['time'].data = np.append(f.variables["time"].data, i)
                f.variables['testData'][i, :, :] = np.ones((dataSize, dataSize))*i
                f.flush()

            # Read the file and check that append worked
            with netcdf_file('withRecordDimension.nc') as f:
                assert_equal(f.variables['time'][-1], i)
                assert_equal(f.variables['testData'][-1, :, :].copy(), np.ones((dataSize, dataSize))*i)
                assert_equal(f.variables['time'].data.shape[0], i+1)
                assert_equal(f.variables['testData'].data.shape[0], i+1)

        # Read the file and check that 'data' was not saved as user defined
        # attribute of testData variable during append operation
        with netcdf_file('withRecordDimension.nc') as f:
            with assert_raises(KeyError) as ar:
                f.variables['testData']._attributes['data']
            ex = ar.value
            assert_equal(ex.args[0], 'data') 
示例20
def test_maskandscale():
    t = np.linspace(20, 30, 15)
    t[3] = 100
    tm = np.ma.masked_greater(t, 99)
    fname = pjoin(TEST_DATA_PATH, 'example_2.nc')
    with netcdf_file(fname, maskandscale=True) as f:
        Temp = f.variables['Temperature']
        assert_equal(Temp.missing_value, 9999)
        assert_equal(Temp.add_offset, 20)
        assert_equal(Temp.scale_factor, np.float32(0.01))
        found = Temp[:].compressed()
        del Temp  # Remove ref to mmap, so file can be closed.
        expected = np.round(tm.compressed(), 2)
        assert_allclose(found, expected)

    with in_tempdir():
        newfname = 'ms.nc'
        f = netcdf_file(newfname, 'w', maskandscale=True)
        f.createDimension('Temperature', len(tm))
        temp = f.createVariable('Temperature', 'i', ('Temperature',))
        temp.missing_value = 9999
        temp.scale_factor = 0.01
        temp.add_offset = 20
        temp[:] = tm
        f.close()

        with netcdf_file(newfname, maskandscale=True) as f:
            Temp = f.variables['Temperature']
            assert_equal(Temp.missing_value, 9999)
            assert_equal(Temp.add_offset, 20)
            assert_equal(Temp.scale_factor, np.float32(0.01))
            expected = np.round(tm.compressed(), 2)
            found = Temp[:].compressed()
            del Temp
            assert_allclose(found, expected)


# ------------------------------------------------------------------------
# Test reading with masked values (_FillValue / missing_value)
# ------------------------------------------------------------------------ 
示例21
def test_read_withValuesNearFillValue():
    # Regression test for ticket #5626
    fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
    with netcdf_file(fname, maskandscale=True) as f:
        vardata = f.variables['var1_fillval0'][:]
        assert_mask_matches(vardata, [False, True, False]) 
示例22
def test_read_withNoFillValue():
    # For a variable with no fill value, reading data with maskandscale=True
    # should return unmasked data
    fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
    with netcdf_file(fname, maskandscale=True) as f:
        vardata = f.variables['var2_noFillval'][:]
        assert_mask_matches(vardata, [False, False, False])
        assert_equal(vardata, [1,2,3]) 
示例23
def test_read_withFillValueAndMissingValue():
    # For a variable with both _FillValue and missing_value, the _FillValue
    # should be used
    IRRELEVANT_VALUE = 9999
    fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
    with netcdf_file(fname, maskandscale=True) as f:
        vardata = f.variables['var3_fillvalAndMissingValue'][:]
        assert_mask_matches(vardata, [True, False, False])
        assert_equal(vardata, [IRRELEVANT_VALUE, 2, 3]) 
示例24
def test_read_withFillValNaN():
    fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
    with netcdf_file(fname, maskandscale=True) as f:
        vardata = f.variables['var5_fillvalNaN'][:]
        assert_mask_matches(vardata, [False, True, False]) 
示例25
def test_read_withChar():
    fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
    with netcdf_file(fname, maskandscale=True) as f:
        vardata = f.variables['var6_char'][:]
        assert_mask_matches(vardata, [False, True, False]) 
示例26
def test_read_with2dVar():
    fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
    with netcdf_file(fname, maskandscale=True) as f:
        vardata = f.variables['var7_2d'][:]
        assert_mask_matches(vardata, [[True, False], [False, False], [False, True]]) 
示例27
def test_read_withMaskAndScaleFalse():
    # If a variable has a _FillValue (or missing_value) attribute, but is read
    # with maskandscale set to False, the result should be unmasked
    fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
    # Open file with mmap=False to avoid problems with closing a mmap'ed file
    # when arrays referring to its data still exist:
    with netcdf_file(fname, maskandscale=False, mmap=False) as f:
        vardata = f.variables['var3_fillvalAndMissingValue'][:]
        assert_mask_matches(vardata, [False, False, False])
        assert_equal(vardata, [1, 2, 3]) 
示例28
def file2dict(filename):

    if not os.path.isfile(filename):
        raise ValueError("ERROR: Could not read %s" % filename)

    nc = netcdf_file(filename, 'r', mmap=False)
    ret = {}
    for ikey in nc.variables.keys():
        data = nc.variables[ikey].data
        if type(data[0]) == np.float64:
            if len(data) == 1:
                data = float(data[0])
            else:
                data = [float(x) for x in data]
        elif type(data[0]) == np.int32:
            if len(data) == 1:
                data = int(data[0])
            else:
                data = [int(x) for x in data]
        else:
            data = list(data)

        ret[ikey] = data
        del data

    nc.close()
    return ret 
示例29
def netcdf2dict(filename):
    """
    Read a NetCDF file and create a python dictionary with
    numbers or lists for each variable

    Args:
        filename:
            NetCDF filename
    """
    if not os.path.isfile(filename):
        print('ERROR: No such file: ', filename)
        return None
    ret = {}
    netcdfile = netcdf_file(filename, 'r', mmap=False)
    for ii in netcdfile.variables.keys():
        ret[ii] = netcdfile.variables[ii][:]
    netcdfile.close()

    for i in ret:
        if ret[i].dtype == np.dtype('>f8'):
            ret[i] = [round(x, 11) for x in ret[i].flatten()]
        elif ret[i].dtype == np.dtype('>i4'):
            ret[i] = [int(x) for x in ret[i].flatten()]

    for i in ret:
        if len(ret[i]) == 1:
            ret[i] = ret[i][0]

    return ret 
示例30
def createSampleXRange(M,N,filename,bounds=None,dx=None,dy=None):
    if dx is None:
        dx = 1.0
    if dy is None:
        dy = 1.0
    if bounds is None:
        xmin = 0.5
        xmax = xmin + (N-1)*dx
        ymin = 0.5
        ymax = ymin + (M-1)*dy
    else:
        xmin,xmax,ymin,ymax = bounds
    data = np.arange(0,M*N).reshape(M,N).astype(np.int32)
    cdf = netcdf.netcdf_file(filename,'w')
    cdf.createDimension('side',2)
    cdf.createDimension('xysize',M*N)
    dim = cdf.createVariable('dimension','i',('side',))
    dim[:] = np.array([N,M])
    spacing = cdf.createVariable('spacing','i',('side',))
    spacing[:] = np.array([dx,dy])
    zrange = cdf.createVariable('z_range',INVERSE_NETCDF_TYPES[str(data.dtype)],('side',))
    zrange[:] = np.array([data.min(),data.max()])
    x_range = cdf.createVariable('x_range','d',('side',))
    x_range[:] = np.array([xmin,xmax])
    y_range = cdf.createVariable('y_range','d',('side',))
    y_range[:] = np.array([ymin,ymax])
    z = cdf.createVariable('z',INVERSE_NETCDF_TYPES[str(data.dtype)],('xysize',))
    z[:] = data.flatten()
    cdf.close()
    return data