Python源码示例:scipy.io.netcdf.netcdf_file()
示例1
def test_ticket_1720():
io = BytesIO()
items = [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9]
with netcdf_file(io, 'w') as f:
f.history = 'Created for a test'
f.createDimension('float_var', 10)
float_var = f.createVariable('float_var', 'f', ('float_var',))
float_var[:] = items
float_var.units = 'metres'
f.flush()
contents = io.getvalue()
io = BytesIO(contents)
with netcdf_file(io, 'r') as f:
assert_equal(f.history, b'Created for a test')
float_var = f.variables['float_var']
assert_equal(float_var.units, b'metres')
assert_equal(float_var.shape, (10,))
assert_allclose(float_var[:], items)
示例2
def test_ticket_1720():
io = BytesIO()
items = [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9]
with netcdf_file(io, 'w') as f:
f.history = 'Created for a test'
f.createDimension('float_var', 10)
float_var = f.createVariable('float_var', 'f', ('float_var',))
float_var[:] = items
float_var.units = 'metres'
f.flush()
contents = io.getvalue()
io = BytesIO(contents)
with netcdf_file(io, 'r') as f:
assert_equal(f.history, b'Created for a test')
float_var = f.variables['float_var']
assert_equal(float_var.units, b'metres')
assert_equal(float_var.shape, (10,))
assert_allclose(float_var[:], items)
示例3
def test_mmaps_segfault():
filename = pjoin(TEST_DATA_PATH, 'example_1.nc')
if not IS_PYPY:
with warnings.catch_warnings():
warnings.simplefilter("error")
with netcdf_file(filename, mmap=True) as f:
x = f.variables['lat'][:]
# should not raise warnings
del x
def doit():
with netcdf_file(filename, mmap=True) as f:
return f.variables['lat'][:]
# should not crash
with suppress_warnings() as sup:
sup.filter(RuntimeWarning,
"Cannot close a netcdf_file opened with mmap=True, when netcdf_variables or arrays referring to its data still exist")
x = doit()
x.sum()
示例4
def test_netcdf_write_as_netcdf3_64bit(tmpdir):
"""Test write_netcdf with output format 64-bit netcdf3."""
from scipy.io import netcdf
field = RasterModelGrid((4, 3))
field.add_field("topographic__elevation", np.arange(12.0), at="node")
field.add_field("uplift_rate", 2.0 * np.arange(12.0), at="node")
with tmpdir.as_cwd():
write_netcdf("test.nc", field, format="NETCDF3_64BIT")
f = netcdf.netcdf_file("test.nc", "r")
for name in ["topographic__elevation", "uplift_rate"]:
assert name in f.variables
assert_array_equal(f.variables[name][:].flatten(), field.at_node[name])
f.close()
示例5
def test_netcdf_write_as_netcdf3_classic(tmpdir):
"""Test write_netcdf with output format classic netcdf3."""
from scipy.io import netcdf
field = RasterModelGrid((4, 3))
field.add_field("topographic__elevation", np.arange(12.0), at="node")
field.add_field("uplift_rate", 2.0 * np.arange(12.0), at="node")
with tmpdir.as_cwd():
write_netcdf("test.nc", field, format="NETCDF3_CLASSIC")
f = netcdf.netcdf_file("test.nc", "r")
for name in ["topographic__elevation", "uplift_rate"]:
assert name in f.variables
assert_array_equal(f.variables[name][:].flatten(), field.at_node[name])
f.close()
示例6
def make_simple(*args, **kwargs):
f = netcdf_file(*args, **kwargs)
f.history = 'Created for a test'
f.createDimension('time', N_EG_ELS)
time = f.createVariable('time', VARTYPE_EG, ('time',))
time[:] = np.arange(N_EG_ELS)
time.units = 'days since 2008-01-01'
f.flush()
yield f
f.close()
示例7
def test_read_write_files():
# test round trip for example file
cwd = os.getcwd()
try:
tmpdir = tempfile.mkdtemp()
os.chdir(tmpdir)
with make_simple('simple.nc', 'w') as f:
pass
# To read the NetCDF file we just created::
with netcdf_file('simple.nc') as f:
# Using mmap is the default
yield assert_true, f.use_mmap
for testargs in gen_for_simple(f):
yield testargs
# Now without mmap
with netcdf_file('simple.nc', mmap=False) as f:
# Using mmap is the default
yield assert_false, f.use_mmap
for testargs in gen_for_simple(f):
yield testargs
# To read the NetCDF file we just created, as file object, no
# mmap. When n * n_bytes(var_type) is not divisible by 4, this
# raised an error in pupynere 1.0.12 and scipy rev 5893, because
# calculated vsize was rounding up in units of 4 - see
# http://www.unidata.ucar.edu/software/netcdf/docs/netcdf.html
fobj = open('simple.nc', 'rb')
with netcdf_file(fobj) as f:
# by default, don't use mmap for file-like
yield assert_false, f.use_mmap
for testargs in gen_for_simple(f):
yield testargs
except:
os.chdir(cwd)
shutil.rmtree(tmpdir)
raise
os.chdir(cwd)
shutil.rmtree(tmpdir)
示例8
def test_read_write_sio():
eg_sio1 = BytesIO()
with make_simple(eg_sio1, 'w') as f1:
str_val = eg_sio1.getvalue()
eg_sio2 = BytesIO(str_val)
with netcdf_file(eg_sio2) as f2:
for testargs in gen_for_simple(f2):
yield testargs
# Test that error is raised if attempting mmap for sio
eg_sio3 = BytesIO(str_val)
yield assert_raises, ValueError, netcdf_file, eg_sio3, 'r', True
# Test 64-bit offset write / read
eg_sio_64 = BytesIO()
with make_simple(eg_sio_64, 'w', version=2) as f_64:
str_val = eg_sio_64.getvalue()
eg_sio_64 = BytesIO(str_val)
with netcdf_file(eg_sio_64) as f_64:
for testargs in gen_for_simple(f_64):
yield testargs
yield assert_equal, f_64.version_byte, 2
# also when version 2 explicitly specified
eg_sio_64 = BytesIO(str_val)
with netcdf_file(eg_sio_64, version=2) as f_64:
for testargs in gen_for_simple(f_64):
yield testargs
yield assert_equal, f_64.version_byte, 2
示例9
def test_read_example_data():
# read any example data files
for fname in glob(pjoin(TEST_DATA_PATH, '*.nc')):
with netcdf_file(fname, 'r') as f:
pass
with netcdf_file(fname, 'r', mmap=False) as f:
pass
示例10
def test_itemset_no_segfault_on_readonly():
# Regression test for ticket #1202.
# Open the test file in read-only mode.
filename = pjoin(TEST_DATA_PATH, 'example_1.nc')
with netcdf_file(filename, 'r') as f:
time_var = f.variables['time']
# time_var.assignValue(42) should raise a RuntimeError--not seg. fault!
assert_raises(RuntimeError, time_var.assignValue, 42)
示例11
def read(self, path, **kwargs):
from scipy.io import netcdf
path = op.abspath(path)
c = self.container
grd = netcdf.netcdf_file(self._getDisplacementFile(path),
mode='r', version=2)
displ = grd.variables['z'][:].copy()
c.displacement = displ
shape = c.displacement.shape
# LatLon
c.frame.spacing = 'degree'
c.frame.llLat = grd.variables['lat'][:].min()
c.frame.llLon = grd.variables['lon'][:].min()
c.frame.dN = (grd.variables['lat'][:].max() -
c.frame.llLat) / shape[0]
c.frame.dE = (grd.variables['lon'][:].max() -
c.frame.llLon) / shape[1]
# Theta and Phi
try:
los = num.memmap(self._getLOSFile(path), dtype='<f4')
e = los[3::6].copy().reshape(shape)
n = los[4::6].copy().reshape(shape)
u = los[5::6].copy().reshape(shape)
phi = num.arctan(n/e)
theta = num.arcsin(u)
# phi[n < 0] += num.pi
c.phi = phi
c.theta = theta
except ImportError:
self._log.warning(self.__doc__)
self._log.warning('Defaulting theta to pi/2 and phi to 0.')
c.theta = num.pi/2
c.phi = 0.
return c
示例12
def make_simple(*args, **kwargs):
f = netcdf_file(*args, **kwargs)
f.history = 'Created for a test'
f.createDimension('time', N_EG_ELS)
time = f.createVariable('time', VARTYPE_EG, ('time',))
time[:] = np.arange(N_EG_ELS)
time.units = 'days since 2008-01-01'
f.flush()
yield f
f.close()
示例13
def test_read_write_sio():
eg_sio1 = BytesIO()
with make_simple(eg_sio1, 'w') as f1:
str_val = eg_sio1.getvalue()
eg_sio2 = BytesIO(str_val)
with netcdf_file(eg_sio2) as f2:
check_simple(f2)
# Test that error is raised if attempting mmap for sio
eg_sio3 = BytesIO(str_val)
assert_raises(ValueError, netcdf_file, eg_sio3, 'r', True)
# Test 64-bit offset write / read
eg_sio_64 = BytesIO()
with make_simple(eg_sio_64, 'w', version=2) as f_64:
str_val = eg_sio_64.getvalue()
eg_sio_64 = BytesIO(str_val)
with netcdf_file(eg_sio_64) as f_64:
check_simple(f_64)
assert_equal(f_64.version_byte, 2)
# also when version 2 explicitly specified
eg_sio_64 = BytesIO(str_val)
with netcdf_file(eg_sio_64, version=2) as f_64:
check_simple(f_64)
assert_equal(f_64.version_byte, 2)
示例14
def test_encoded_fill_value():
with netcdf_file(BytesIO(), mode='w') as f:
f.createDimension('x', 1)
var = f.createVariable('var', 'S1', ('x',))
assert_equal(var._get_encoded_fill_value(), b'\x00')
var._FillValue = b'\x01'
assert_equal(var._get_encoded_fill_value(), b'\x01')
var._FillValue = b'\x00\x00' # invalid, wrong size
assert_equal(var._get_encoded_fill_value(), b'\x00')
示例15
def test_read_example_data():
# read any example data files
for fname in glob(pjoin(TEST_DATA_PATH, '*.nc')):
with netcdf_file(fname, 'r') as f:
pass
with netcdf_file(fname, 'r', mmap=False) as f:
pass
示例16
def test_appending_issue_gh_8625():
stream = BytesIO()
with make_simple(stream, mode='w') as f:
f.createDimension('x', 2)
f.createVariable('x', float, ('x',))
f.variables['x'][...] = 1
f.flush()
contents = stream.getvalue()
stream = BytesIO(contents)
with netcdf_file(stream, mode='a') as f:
f.variables['x'][...] = 2
示例17
def test_write_invalid_dtype():
dtypes = ['int64', 'uint64']
if np.dtype('int').itemsize == 8: # 64-bit machines
dtypes.append('int')
if np.dtype('uint').itemsize == 8: # 64-bit machines
dtypes.append('uint')
with netcdf_file(BytesIO(), 'w') as f:
f.createDimension('time', N_EG_ELS)
for dt in dtypes:
assert_raises(ValueError, f.createVariable, 'time', dt, ('time',))
示例18
def test_byte_gatts():
# Check that global "string" atts work like they did before py3k
# unicode and general bytes confusion
with in_tempdir():
filename = 'g_byte_atts.nc'
f = netcdf_file(filename, 'w')
f._attributes['holy'] = b'grail'
f._attributes['witch'] = 'floats'
f.close()
f = netcdf_file(filename, 'r')
assert_equal(f._attributes['holy'], b'grail')
assert_equal(f._attributes['witch'], b'floats')
f.close()
示例19
def test_append_recordDimension():
dataSize = 100
with in_tempdir():
# Create file with record time dimension
with netcdf_file('withRecordDimension.nc', 'w') as f:
f.createDimension('time', None)
f.createVariable('time', 'd', ('time',))
f.createDimension('x', dataSize)
x = f.createVariable('x', 'd', ('x',))
x[:] = np.array(range(dataSize))
f.createDimension('y', dataSize)
y = f.createVariable('y', 'd', ('y',))
y[:] = np.array(range(dataSize))
f.createVariable('testData', 'i', ('time', 'x', 'y'))
f.flush()
f.close()
for i in range(2):
# Open the file in append mode and add data
with netcdf_file('withRecordDimension.nc', 'a') as f:
f.variables['time'].data = np.append(f.variables["time"].data, i)
f.variables['testData'][i, :, :] = np.ones((dataSize, dataSize))*i
f.flush()
# Read the file and check that append worked
with netcdf_file('withRecordDimension.nc') as f:
assert_equal(f.variables['time'][-1], i)
assert_equal(f.variables['testData'][-1, :, :].copy(), np.ones((dataSize, dataSize))*i)
assert_equal(f.variables['time'].data.shape[0], i+1)
assert_equal(f.variables['testData'].data.shape[0], i+1)
# Read the file and check that 'data' was not saved as user defined
# attribute of testData variable during append operation
with netcdf_file('withRecordDimension.nc') as f:
with assert_raises(KeyError) as ar:
f.variables['testData']._attributes['data']
ex = ar.value
assert_equal(ex.args[0], 'data')
示例20
def test_maskandscale():
t = np.linspace(20, 30, 15)
t[3] = 100
tm = np.ma.masked_greater(t, 99)
fname = pjoin(TEST_DATA_PATH, 'example_2.nc')
with netcdf_file(fname, maskandscale=True) as f:
Temp = f.variables['Temperature']
assert_equal(Temp.missing_value, 9999)
assert_equal(Temp.add_offset, 20)
assert_equal(Temp.scale_factor, np.float32(0.01))
found = Temp[:].compressed()
del Temp # Remove ref to mmap, so file can be closed.
expected = np.round(tm.compressed(), 2)
assert_allclose(found, expected)
with in_tempdir():
newfname = 'ms.nc'
f = netcdf_file(newfname, 'w', maskandscale=True)
f.createDimension('Temperature', len(tm))
temp = f.createVariable('Temperature', 'i', ('Temperature',))
temp.missing_value = 9999
temp.scale_factor = 0.01
temp.add_offset = 20
temp[:] = tm
f.close()
with netcdf_file(newfname, maskandscale=True) as f:
Temp = f.variables['Temperature']
assert_equal(Temp.missing_value, 9999)
assert_equal(Temp.add_offset, 20)
assert_equal(Temp.scale_factor, np.float32(0.01))
expected = np.round(tm.compressed(), 2)
found = Temp[:].compressed()
del Temp
assert_allclose(found, expected)
# ------------------------------------------------------------------------
# Test reading with masked values (_FillValue / missing_value)
# ------------------------------------------------------------------------
示例21
def test_read_withValuesNearFillValue():
# Regression test for ticket #5626
fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
with netcdf_file(fname, maskandscale=True) as f:
vardata = f.variables['var1_fillval0'][:]
assert_mask_matches(vardata, [False, True, False])
示例22
def test_read_withNoFillValue():
# For a variable with no fill value, reading data with maskandscale=True
# should return unmasked data
fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
with netcdf_file(fname, maskandscale=True) as f:
vardata = f.variables['var2_noFillval'][:]
assert_mask_matches(vardata, [False, False, False])
assert_equal(vardata, [1,2,3])
示例23
def test_read_withFillValueAndMissingValue():
# For a variable with both _FillValue and missing_value, the _FillValue
# should be used
IRRELEVANT_VALUE = 9999
fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
with netcdf_file(fname, maskandscale=True) as f:
vardata = f.variables['var3_fillvalAndMissingValue'][:]
assert_mask_matches(vardata, [True, False, False])
assert_equal(vardata, [IRRELEVANT_VALUE, 2, 3])
示例24
def test_read_withFillValNaN():
fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
with netcdf_file(fname, maskandscale=True) as f:
vardata = f.variables['var5_fillvalNaN'][:]
assert_mask_matches(vardata, [False, True, False])
示例25
def test_read_withChar():
fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
with netcdf_file(fname, maskandscale=True) as f:
vardata = f.variables['var6_char'][:]
assert_mask_matches(vardata, [False, True, False])
示例26
def test_read_with2dVar():
fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
with netcdf_file(fname, maskandscale=True) as f:
vardata = f.variables['var7_2d'][:]
assert_mask_matches(vardata, [[True, False], [False, False], [False, True]])
示例27
def test_read_withMaskAndScaleFalse():
# If a variable has a _FillValue (or missing_value) attribute, but is read
# with maskandscale set to False, the result should be unmasked
fname = pjoin(TEST_DATA_PATH, 'example_3_maskedvals.nc')
# Open file with mmap=False to avoid problems with closing a mmap'ed file
# when arrays referring to its data still exist:
with netcdf_file(fname, maskandscale=False, mmap=False) as f:
vardata = f.variables['var3_fillvalAndMissingValue'][:]
assert_mask_matches(vardata, [False, False, False])
assert_equal(vardata, [1, 2, 3])
示例28
def file2dict(filename):
if not os.path.isfile(filename):
raise ValueError("ERROR: Could not read %s" % filename)
nc = netcdf_file(filename, 'r', mmap=False)
ret = {}
for ikey in nc.variables.keys():
data = nc.variables[ikey].data
if type(data[0]) == np.float64:
if len(data) == 1:
data = float(data[0])
else:
data = [float(x) for x in data]
elif type(data[0]) == np.int32:
if len(data) == 1:
data = int(data[0])
else:
data = [int(x) for x in data]
else:
data = list(data)
ret[ikey] = data
del data
nc.close()
return ret
示例29
def netcdf2dict(filename):
"""
Read a NetCDF file and create a python dictionary with
numbers or lists for each variable
Args:
filename:
NetCDF filename
"""
if not os.path.isfile(filename):
print('ERROR: No such file: ', filename)
return None
ret = {}
netcdfile = netcdf_file(filename, 'r', mmap=False)
for ii in netcdfile.variables.keys():
ret[ii] = netcdfile.variables[ii][:]
netcdfile.close()
for i in ret:
if ret[i].dtype == np.dtype('>f8'):
ret[i] = [round(x, 11) for x in ret[i].flatten()]
elif ret[i].dtype == np.dtype('>i4'):
ret[i] = [int(x) for x in ret[i].flatten()]
for i in ret:
if len(ret[i]) == 1:
ret[i] = ret[i][0]
return ret
示例30
def createSampleXRange(M,N,filename,bounds=None,dx=None,dy=None):
if dx is None:
dx = 1.0
if dy is None:
dy = 1.0
if bounds is None:
xmin = 0.5
xmax = xmin + (N-1)*dx
ymin = 0.5
ymax = ymin + (M-1)*dy
else:
xmin,xmax,ymin,ymax = bounds
data = np.arange(0,M*N).reshape(M,N).astype(np.int32)
cdf = netcdf.netcdf_file(filename,'w')
cdf.createDimension('side',2)
cdf.createDimension('xysize',M*N)
dim = cdf.createVariable('dimension','i',('side',))
dim[:] = np.array([N,M])
spacing = cdf.createVariable('spacing','i',('side',))
spacing[:] = np.array([dx,dy])
zrange = cdf.createVariable('z_range',INVERSE_NETCDF_TYPES[str(data.dtype)],('side',))
zrange[:] = np.array([data.min(),data.max()])
x_range = cdf.createVariable('x_range','d',('side',))
x_range[:] = np.array([xmin,xmax])
y_range = cdf.createVariable('y_range','d',('side',))
y_range[:] = np.array([ymin,ymax])
z = cdf.createVariable('z',INVERSE_NETCDF_TYPES[str(data.dtype)],('xysize',))
z[:] = data.flatten()
cdf.close()
return data