blob: 9cb60cedff943db9bce61aba40e12dfc7553790e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
import datetime
import os
from ocw import dataset_processor as dp
from ocw import dataset as ds
from ocw.data_source import local
import numpy as np
import numpy.ma as ma
import logging
logging.basicConfig(level=logging.CRITICAL)
class TestEnsemble(unittest.TestCase):
def test_unequal_dataset_shapes(self):
self.ten_year_dataset = ten_year_monthly_dataset()
self.two_year_dataset = two_year_daily_dataset()
with self.assertRaises(ValueError):
self.ensemble_dataset = dp.ensemble([self.ten_year_dataset, self.two_year_dataset])
def test_ensemble_logic(self):
self.datasets = []
self.datasets.append(build_ten_cube_dataset(1))
self.datasets.append(build_ten_cube_dataset(2))
self.three = build_ten_cube_dataset(3)
self.datasets.append(self.three)
self.datasets.append(build_ten_cube_dataset(4))
self.datasets.append(build_ten_cube_dataset(5))
self.ensemble = dp.ensemble(self.datasets)
self.ensemble_flat = self.ensemble.values.flatten()
self.three_flat = self.three.values.flatten()
np.testing.assert_array_equal(self.ensemble_flat, self.three_flat)
def test_ensemble_name(self):
self.ensemble_dataset_name = "Dataset Ensemble"
self.datasets = []
self.datasets.append(build_ten_cube_dataset(1))
self.datasets.append(build_ten_cube_dataset(2))
self.ensemble = dp.ensemble(self.datasets)
self.assertEquals(self.ensemble.name, self.ensemble_dataset_name)
class TestTemporalRebin(unittest.TestCase):
def setUp(self):
self.ten_year_monthly_dataset = ten_year_monthly_dataset()
self.ten_year_annual_times = np.array([datetime.datetime(year, 1, 1) for year in range(2000, 2010)])
self.two_years_daily_dataset = two_year_daily_dataset()
def test_monthly_to_annual_rebin(self):
annual_dataset = dp.temporal_rebin(self.ten_year_monthly_dataset, datetime.timedelta(days=365))
np.testing.assert_array_equal(annual_dataset.times, self.ten_year_annual_times)
def test_monthly_to_full_rebin(self):
full_dataset = dp.temporal_rebin(self.ten_year_monthly_dataset, datetime.timedelta(days=3650))
full_times = [datetime.datetime(2004, 12, 16)]
self.assertEqual(full_dataset.times, full_times)
def test_daily_to_monthly_rebin(self):
"""This test takes a really long time to run. TODO: Figure out where the performance drag is"""
monthly_dataset = dp.temporal_rebin(self.two_years_daily_dataset, datetime.timedelta(days=31))
bins = list(set([datetime.datetime(time_reading.year, time_reading.month, 1) for time_reading in self.two_years_daily_dataset.times]))
bins = np.array(bins)
bins.sort()
np.testing.assert_array_equal(monthly_dataset.times, bins)
def test_daily_to_annual_rebin(self):
annual_dataset = dp.temporal_rebin(self.two_years_daily_dataset, datetime.timedelta(days=366))
bins = list(set([datetime.datetime(time_reading.year, 1, 1) for time_reading in self.two_years_daily_dataset.times]))
bins = np.array(bins)
bins.sort()
np.testing.assert_array_equal(annual_dataset.times, bins)
def test_non_rebin(self):
"""This will take a monthly dataset and ask for a monthly rebin of 28 days. The resulting
dataset should have the same time values"""
monthly_dataset = dp.temporal_rebin(self.ten_year_monthly_dataset, datetime.timedelta(days=28))
good_times = self.ten_year_monthly_dataset.times
np.testing.assert_array_equal(monthly_dataset.times, good_times)
def test_variable_propagation(self):
annual_dataset = dp.temporal_rebin(self.ten_year_monthly_dataset,
datetime.timedelta(days=365))
self.assertEquals(annual_dataset.name,
self.ten_year_monthly_dataset.name)
self.assertEquals(annual_dataset.variable,
self.ten_year_monthly_dataset.variable)
class TestRcmesSpatialRegrid(unittest.TestCase):
def test_return_array_shape(self):
spatial_values = np.ones([90,180])
spatial_values = ma.array(spatial_values)
lat_range = ma.array(range(-89, 90, 2))
lon_range = ma.array(range(-179, 180, 2))
lons, lats = np.meshgrid(lon_range, lat_range)
# Convert these to masked arrays
lats = ma.array(lats)
lons = ma.array(lons)
lat2_range = np.array(range(-89, 90, 4))
lon2_range = np.array(range(-179, 180, 4))
lons2, lats2 = np.meshgrid(lon2_range, lat2_range)
# Convert to masked arrays
lats2 = ma.array(lats2)
lons2 = ma.array(lons2)
regridded_values = dp._rcmes_spatial_regrid(spatial_values, lats, lons, lats2, lons2)
self.assertEqual(regridded_values.shape, lats2.shape)
self.assertEqual(regridded_values.shape, lons2.shape)
class TestSpatialRegrid(unittest.TestCase):
def setUp(self):
self.input_dataset = ten_year_monthly_dataset()
self.new_lats = np.array(range(-89, 90, 4))
self.new_lons = np.array(range(-179, 180, 4))
self.regridded_dataset = dp.spatial_regrid(self.input_dataset, self.new_lats, self.new_lons)
def test_returned_lats(self):
np.testing.assert_array_equal(self.regridded_dataset.lats, self.new_lats)
def test_returned_lons(self):
np.testing.assert_array_equal(self.regridded_dataset.lons, self.new_lons)
def test_shape_of_values(self):
regridded_data_shape = self.regridded_dataset.values.shape
expected_data_shape = (len(self.input_dataset.times), len(self.new_lats), len(self.new_lons))
self.assertSequenceEqual(regridded_data_shape, expected_data_shape)
def test_variable_propagation(self):
self.assertEquals(self.input_dataset.name, self.regridded_dataset.name)
self.assertEquals(self.input_dataset.variable, self.regridded_dataset.variable)
class TestNormalizeDatasetDatetimes(unittest.TestCase):
def setUp(self):
self.monthly_dataset = ten_year_monthly_15th_dataset()
self.daily_dataset = two_year_daily_2hr_dataset()
def test_daily(self):
new_ds = dp.normalize_dataset_datetimes(self.monthly_dataset, 'daily')
# Check that all the days have been shifted to the first of the month
self.assertTrue(all(x.hour == 0 for x in new_ds.times))
def test_montly(self):
new_ds = dp.normalize_dataset_datetimes(self.monthly_dataset, 'monthly')
# Check that all the days have been shifted to the first of the month
self.assertTrue(all(x.day == 1 for x in new_ds.times))
class TestSubset(unittest.TestCase):
def setUp(self):
self.target_dataset = ten_year_monthly_dataset()
self.name = 'foo'
self.subregion = ds.Bounds(
-81, 81,
-161, 161,
datetime.datetime(2001, 1, 1),
datetime.datetime(2004, 1, 1)
)
self.non_exact_spatial_subregion = ds.Bounds(
-80.25, 80.5,
-160.25, 160.5,
datetime.datetime(2001, 1, 1),
datetime.datetime(2004, 1, 1)
)
self.non_exact_temporal_subregion = ds.Bounds(
-80.25, 80.5,
-160.25, 160.5,
datetime.datetime(2001, 1, 15),
datetime.datetime(2004, 2, 15)
)
def test_subset(self):
subset = dp.subset(self.subregion, self.target_dataset)
self.assertEqual(subset.lats.shape[0], 82)
self.assertSequenceEqual(list(np.array(range(-81, 82, 2))),
list(subset.lats))
self.assertEqual(subset.lons.shape[0], 162)
self.assertEqual(subset.times.shape[0], 37)
self.assertEqual(subset.values.shape, (37, 82, 162))
def test_subset_name(self):
subset = dp.subset(self.subregion, self.target_dataset)
self.assertEqual(subset.name, self.name)
def test_subset_name_propagation(self):
subset_name = 'foo_subset_name'
subset = dp.subset(self.subregion, self.target_dataset,subset_name)
self.assertEqual(subset.name, subset_name)
def test_subset_using_non_exact_spatial_bounds(self):
index_slices = dp._get_subregion_slice_indices(self.non_exact_spatial_subregion, self.target_dataset)
control_index_slices = {"lat_start" : 5,
"lat_end" : 84,
"lon_start" : 10,
"lon_end" : 169,
"time_start" : 12,
"time_end" : 48}
self.assertDictEqual(index_slices, control_index_slices)
def test_subset_using_non_exact_temporal_bounds(self):
index_slices = dp._get_subregion_slice_indices(self.non_exact_temporal_subregion, self.target_dataset)
control_index_slices = {"lat_start" : 5,
"lat_end" : 84,
"lon_start" : 10,
"lon_end" : 169,
"time_start" : 13,
"time_end" : 49}
self.assertDictEqual(index_slices, control_index_slices)
class TestSafeSubset(unittest.TestCase):
def setUp(self):
lats = np.array(range(-60, 61, 1))
lons = np.array(range(-170, 171, 1))
times = np.array([datetime.datetime(year, month, 1)
for year in range(2000, 2010)
for month in range(1, 13)])
values = np.ones([len(times), len(lats), len(lons)])
self.target_dataset = ds.Dataset(lats,
lons,
times,
values,
variable="test variable name",
units='test variable units',
name='foo')
self.spatial_out_of_bounds = ds.Bounds(
-165, 165,
-180, 180,
datetime.datetime(2001, 1, 1),
datetime.datetime(2004, 1, 1)
)
self.temporal_out_of_bounds = ds.Bounds(
-40, 40,
-160.25, 160.5,
datetime.datetime(1999, 1, 15),
datetime.datetime(2222, 2, 15)
)
self.everything_out_of_bounds = ds.Bounds(
-165, 165,
-180, 180,
datetime.datetime(1999, 1, 15),
datetime.datetime(2222, 2, 15)
)
def test_partial_spatial_overlap(self):
'''Ensure that safe_subset can handle out of bounds spatial values'''
ds = dp.safe_subset(self.spatial_out_of_bounds, self.target_dataset)
spatial_bounds = ds.spatial_boundaries()
self.assertEquals(spatial_bounds[0], -60)
self.assertEquals(spatial_bounds[1], 60)
self.assertEquals(spatial_bounds[2], -170)
self.assertEquals(spatial_bounds[3], 170)
def test_partial_temporal_overlap(self):
'''Ensure that safe_subset can handle out of bounds temporal values'''
ds = dp.safe_subset(self.temporal_out_of_bounds, self.target_dataset)
temporal_bounds = ds.time_range()
start = datetime.datetime(2000, 1, 1)
end = datetime.datetime(2009, 12, 1)
self.assertEquals(temporal_bounds[0], start)
self.assertEquals(temporal_bounds[1], end)
def test_entire_bounds_overlap(self):
ds = dp.safe_subset(self.everything_out_of_bounds, self.target_dataset)
spatial_bounds = ds.spatial_boundaries()
temporal_bounds = ds.time_range()
start = datetime.datetime(2000, 1, 1)
end = datetime.datetime(2009, 12, 1)
self.assertEquals(spatial_bounds[0], -60)
self.assertEquals(spatial_bounds[1], 60)
self.assertEquals(spatial_bounds[2], -170)
self.assertEquals(spatial_bounds[3], 170)
self.assertEquals(temporal_bounds[0], start)
self.assertEquals(temporal_bounds[1], end)
class TestFailingSubset(unittest.TestCase):
def setUp(self):
self.target_dataset = ten_year_monthly_dataset()
self.target_dataset.lats = np.array(range(-89, 88, 2))
self.target_dataset.lons = np.array(range(-179, 178, 2))
self.subregion = ds.Bounds(
-81, 81,
-161, 161,
datetime.datetime(2001, 1, 1),
datetime.datetime(2004, 1, 1)
)
def test_out_of_dataset_bounds_lat_min(self):
self.subregion.lat_min = -90
with self.assertRaises(ValueError):
dp.subset(self.subregion, self.target_dataset)
def test_out_of_dataset_bounds_lat_max(self):
self.subregion.lat_max = 90
with self.assertRaises(ValueError):
dp.subset(self.subregion, self.target_dataset)
def test_out_of_dataset_bounds_lon_min(self):
self.subregion.lon_min = -180
with self.assertRaises(ValueError):
dp.subset(self.subregion, self.target_dataset)
def test_out_of_dataset_bounds_lon_max(self):
self.subregion.lon_max = 180
with self.assertRaises(ValueError):
dp.subset(self.subregion, self.target_dataset)
def test_out_of_dataset_bounds_start(self):
self.subregion.start = datetime.datetime(1999, 1, 1)
with self.assertRaises(ValueError):
dp.subset(self.subregion, self.target_dataset)
def test_out_of_dataset_bounds_end(self):
self.subregion.end = datetime.datetime(2011, 1, 1)
with self.assertRaises(ValueError):
dp.subset(self.subregion, self.target_dataset)
class TestNetCDFWrite(unittest.TestCase):
def setUp(self):
self.ds = ten_year_monthly_dataset()
self.file_name = 'test.nc'
def tearDown(self):
if os.path.isfile(self.file_name):
os.remove(self.file_name)
def test_file_write(self):
dp.write_netcdf(self.ds, self.file_name)
self.assertTrue(os.path.isfile(self.file_name))
def test_that_file_contents_are_valid(self):
dp.write_netcdf(self.ds, self.file_name)
new_ds = local.load_file(self.file_name, self.ds.variable)
self.assertEqual(self.ds.variable, new_ds.variable)
np.testing.assert_array_equal(self.ds.lats, new_ds.lats)
np.testing.assert_array_equal(self.ds.lons, new_ds.lons)
np.testing.assert_array_equal(self.ds.times, new_ds.times)
np.testing.assert_array_equal(self.ds.values, new_ds.values)
def ten_year_monthly_dataset():
lats = np.array(range(-89, 90, 2))
lons = np.array(range(-179, 180, 2))
# Ten Years of monthly data
times = np.array([datetime.datetime(year, month, 1) for year in range(2000, 2010) for month in range(1, 13)])
values = np.ones([len(times), len(lats), len(lons)])
input_dataset = ds.Dataset(lats, lons, times, values, variable="test variable name", units='test variable units', name='foo')
return input_dataset
def ten_year_monthly_15th_dataset():
lats = np.array(range(-89, 90, 2))
lons = np.array(range(-179, 180, 2))
# Ten Years of monthly data
times = np.array([datetime.datetime(year, month, 15) for year in range(2000, 2010) for month in range(1, 13)])
values = np.ones([len(times), len(lats), len(lons)])
input_dataset = ds.Dataset(lats, lons, times, values, variable="test variable name", units='test variable units')
return input_dataset
def two_year_daily_dataset():
lats = np.array(range(-89, 90, 2))
lons = np.array(range(-179, 180, 2))
times = np.array([datetime.datetime(2001, 1, 1) + datetime.timedelta(days=d) for d in range(730)])
values = np.ones([len(times), len(lats), len(lons)])
dataset = ds.Dataset(lats, lons, times, values, variable='random data',units='test variable units')
return dataset
def two_year_daily_2hr_dataset():
lats = np.array(range(-89, 90, 2))
lons = np.array(range(-179, 180, 2))
times = np.array([datetime.datetime(2001, 1, 1) + datetime.timedelta(days=d, hours=2) for d in range(730)])
values = np.ones([len(times), len(lats), len(lons)])
dataset = ds.Dataset(lats, lons, times, values, variable='random data', units='test variable units')
return dataset
def build_ten_cube_dataset(value):
lats = np.array(range(-89, 90, 18))
lons = np.array(range(-179, 180, 36))
times = np.array([datetime.datetime(year, 1, 1) for year in range(2000, 2010)])
values = np.ones([len(times), len(lats), len(lons)])
values = values * value
dataset = ds.Dataset(lats, lons, times, values)
return dataset
if __name__ == '__main__':
unittest.main()