blob: 987f2e6c27e434f6efa538a3bef238b6dbf1a475 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
import os
import datetime
from dateutil.relativedelta import relativedelta
import netCDF4
import numpy as np
from ocw.dataset import Dataset
import ocw.utils as utils
class TestDecodeTimes(unittest.TestCase):
def setUp(self):
path = os.path.dirname(os.path.realpath(__file__))
self.test_model = path + \
'/../../ocw-ui/backend/tests/example_data/lat_lon_time.nc'
self.test_model_daily = path + '/lat_lon_time_daily.nc'
self.netcdf = netCDF4.Dataset(
os.path.abspath(self.test_model), mode='r')
self.netcdf_daily = netCDF4.Dataset(
os.path.abspath(self.test_model_daily), mode='r')
def test_proper_return_format(self):
times = utils.decode_time_values(self.netcdf, 'time')
self.assertTrue(all([type(x) is datetime.datetime for x in times]))
def test_valid_time_processing(self):
start_time = datetime.datetime.strptime(
'1989-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
end_time = datetime.datetime.strptime(
'2008-12-01 00:00:00', '%Y-%m-%d %H:%M:%S')
times = utils.decode_time_values(self.netcdf, 'time')
self.assertEquals(times[0], start_time)
self.assertEquals(times[-1], end_time)
def test_days_time_processing(self):
start_time = datetime.datetime.strptime(
'1951-4-14 00:00:00', '%Y-%m-%d %H:%M:%S')
end_time = datetime.datetime.strptime(
'1951-12-9 00:00:00', '%Y-%m-%d %H:%M:%S')
new_times = utils.decode_time_values(self.netcdf_daily, 'time')
self.assertEqual(new_times[0], start_time)
self.assertEqual(new_times[-1], end_time)
class TestTimeUnitsParse(unittest.TestCase):
def test_valid_parse(self):
units = utils.parse_time_units('minutes since a made up date')
self.assertEquals(units, 'minutes')
def test_invalid_parse(self):
self.assertRaises(
ValueError,
utils.parse_time_units,
'parsecs since a made up date'
)
class TestTimeBaseParse(unittest.TestCase):
def test_valid_time_base(self):
base_time = utils.parse_time_base('days since 1988-06-10 00:00:00')
start_time = datetime.datetime.strptime(
'1988-06-10 00:00:00', '%Y-%m-%d %H:%M:%S')
self.assertEquals(base_time, start_time)
def test_invalid_time_base(self):
self.assertRaises(
ValueError,
utils.parse_time_base,
'days since 1988g06g10g00g00g00'
)
class TestBaseTimeStringParse(unittest.TestCase):
def test_valid_time_base_string_parse(self):
base = utils.parse_base_time_string('days since 1988-06-10 00:00:00')
self.assertEquals(base, '1988-06-10 00:00:00')
def test_invalid_time_base_string_parse(self):
self.assertRaises(
ValueError,
utils.parse_base_time_string,
'this string is not valid'
)
class TestNormalizeLatLonValues(unittest.TestCase):
def setUp(self):
times = np.array([datetime.datetime(2000, x, 1) for x in range(1, 13)])
self.lats = np.arange(-30, 30)
self.lons = np.arange(360)
flat_array = np.arange(len(times) * len(self.lats) * len(self.lons))
self.values = flat_array.reshape(
len(times), len(self.lats), len(self.lons))
self.lats2 = np.array([-30, 0, 30])
self.lons2 = np.array([0, 100, 200, 300])
self.values2 = np.arange(12).reshape(3, 4)
self.lats_unsorted = np.array([-30, 20, -50])
self.lons_unsorted = np.array([-30, 20, -50, 40])
def test_full_lons_shift(self):
lats, lons, values = utils.normalize_lat_lon_values(self.lats,
self.lons,
self.values)
np.testing.assert_array_equal(lons, np.arange(-180, 180))
def test_lats_reversed(self):
lons2 = np.arange(-180, 180)
lats, lons, values = utils.normalize_lat_lon_values(self.lats[::-1],
lons2,
self.values[:,
::-1,
:])
np.testing.assert_array_equal(lats, self.lats)
np.testing.assert_array_equal(values, self.values)
def test_lons_reversed(self):
self.lats = np.arange(-10, 10)
self.lons = np.arange(40)
times = np.array([datetime.datetime(2000, x, 1) for x in range(1, 7)])
flat_array = np.arange(len(times) * len(self.lats) * len(self.lons))
self.variable = flat_array.reshape(len(times),
len(self.lats),
len(self.lons))
lats, lons, values = utils.normalize_lat_lon_values(self.lats,
self.lons[::-1],
self.values[:,
::,
::-1])
np.testing.assert_array_equal(lats, self.lats)
np.testing.assert_array_equal(values, self.values)
np.testing.assert_array_equal(lons, self.lons)
def test_lons_shift_values(self):
expected_vals = np.array([[2, 3, 0, 1],
[6, 7, 4, 5],
[10, 11, 8, 9]])
lats, lons, values = utils.normalize_lat_lon_values(self.lats2,
self.lons2,
self.values2)
np.testing.assert_array_equal(values, expected_vals)
def test_shift_and_reversed(self):
expected_vals = np.array([[10, 11, 8, 9],
[6, 7, 4, 5],
[2, 3, 0, 1]])
lats, lons, values = utils.normalize_lat_lon_values(self.lats2[::-1],
self.lons2,
self.values2)
np.testing.assert_array_equal(values, expected_vals)
def test_lats_not_sorted(self):
self.assertRaises(ValueError,
utils.normalize_lat_lon_values,
self.lats_unsorted,
self.lons2,
self.values2)
def test_lons_not_sorted(self):
self.assertRaises(ValueError,
utils.normalize_lat_lon_values,
self.lats2,
self.lons_unsorted,
self.values2)
def test_lons_greater_than_180(self):
self.lons = np.array([190, 210, 230, 250])
self.lats = np.array([-30, 0, 30])
self.values = np.arange(12).reshape(3, 4)
expected_lons = np.array([-170, -150, -130, -110])
expected_values = np.array([[0, 1, 2, 3],
[4, 5, 6, 7],
[8, 9, 10, 11]])
lats, lons, values = utils.normalize_lat_lon_values(self.lats,
self.lons,
self.values)
np.testing.assert_array_equal(lons, expected_lons)
np.testing.assert_array_equal(expected_values, values)
class TestGetTemporalOverlap(unittest.TestCase):
def setUp(self):
self.lat = np.array([10, 12, 14, 16, 18])
self.lon = np.array([100, 102, 104, 106, 108])
self.time = np.array(
[datetime.datetime(2000, x, 1) for x in range(1, 13)])
flat_array = np.array(range(300))
self.value = flat_array.reshape(12, 5, 5)
self.variable = 'prec'
self.test_dataset = Dataset(self.lat, self.lon, self.time,
self.value, self.variable)
self.dataset_array = [self.test_dataset, self.test_dataset]
def test_same_dataset_temporal_overlap(self):
maximum, minimum = utils.get_temporal_overlap(self.dataset_array)
self.assertEqual(maximum, datetime.datetime(2000, 1, 1))
self.assertEqual(minimum, datetime.datetime(2000, 12, 1))
def test_different_dataset_temporal_overlap(self):
new_times = np.array(
[datetime.datetime(2002, x, 1) for x in range(1, 13)])
another_dataset = Dataset(self.lat, self.lon, new_times,
self.value, self.variable)
self.dataset_array = [self.test_dataset, another_dataset]
maximum, minimum = utils.get_temporal_overlap(self.dataset_array)
self.assertEqual(maximum, datetime.datetime(2002, 1, 1))
self.assertEqual(minimum, datetime.datetime(2000, 12, 1))
class TestReshapeMonthlyToAnnually(unittest.TestCase):
''' Testing function 'reshape_monthly_to_annually' from ocw.utils.py '''
def setUp(self):
self.lat = np.array([10, 12, 14, 16, 18])
self.lon = np.array([100, 102, 104, 106, 108])
self.time = np.array(
[datetime.datetime(2000, 1, 1) + relativedelta(months=x)
for x in range(24)])
flat_array = np.array(range(600))
self.value = flat_array.reshape(24, 5, 5)
self.variable = 'prec'
self.test_dataset = Dataset(self.lat, self.lon, self.time,
self.value, self.variable)
def test_reshape_full_year(self):
new_values = self.value.reshape(2, 12, 5, 5)
np.testing.assert_array_equal(
utils.reshape_monthly_to_annually(self.test_dataset), new_values)
def test_reshape_not_full_year(self):
new_time = np.array(
[datetime.datetime(2000, 1, 1) + relativedelta(months=x)
for x in range(26)])
flat_array = np.array(range(650))
value = flat_array.reshape(26, 5, 5)
bad_dataset = Dataset(self.lat, self.lon,
new_time, value, self.variable)
self.assertRaises(
ValueError, utils.reshape_monthly_to_annually, bad_dataset)
class TestCalcTemporalMean(unittest.TestCase):
def setUp(self):
self.lat = np.array([10, 12, 14])
self.lon = np.array([100, 102, 104])
self.time = np.array(
[datetime.datetime(2000, x, 1) for x in range(1, 7)])
flat_array = np.array(range(54))
self.value = flat_array.reshape(6, 3, 3)
self.variable = 'prec'
self.test_dataset = Dataset(self.lat, self.lon, self.time,
self.value, self.variable)
def test_returned_mean(self):
mean_values = np.array([[22.5, 23.5, 24.5],
[25.5, 26.5, 27.5],
[28.5, 29.5, 30.5]])
result = utils.calc_temporal_mean(self.test_dataset)
np.testing.assert_array_equal(result, mean_values)
class TestCalcAreaWeightedSpatialAverage(unittest.TestCase):
def setUp(self):
self.lat = np.array([10, 12, 14])
self.lon = np.array([100, 102, 104])
self.time = np.array(
[datetime.datetime(2000, x, 1) for x in range(1, 7)])
flat_array = np.array(range(54))
self.value = flat_array.reshape(6, 3, 3)
self.variable = 'prec'
self.test_dataset = Dataset(self.lat, self.lon, self.time,
self.value, self.variable)
def test_spatial_average(self):
avg = np.ma.array([4., 13., 22., 31., 40., 49.])
result = utils.calc_area_weighted_spatial_average(self.test_dataset)
np.testing.assert_array_equal(avg, result)
def test_2_dim_lats_lons(self):
self.lat = np.array([10, 12, 14]).reshape(3, 1)
self.lon = np.array([100, 102, 104]).reshape(3, 1)
flat_array = np.array(range(18))
self.value = flat_array.reshape(6, 3, 1)
self.test_dataset = Dataset(self.lat, self.lon, self.time,
self.value, self.variable)
avg = np.ma.array([1., 4., 7., 10., 13., 16.])
result = utils.calc_area_weighted_spatial_average(self.test_dataset)
np.testing.assert_array_equal(avg, result)
def test_spatial_average_with_area_weight(self):
avg = np.ma.array([3.985158, 12.985158, 21.985158,
30.985158, 39.985158, 48.985158])
result = utils.calc_area_weighted_spatial_average(
self.test_dataset, area_weight=True)
np.testing.assert_array_almost_equal(avg, result)
def test__2_dim_lats_lons_area_weight(self):
self.lat = np.array([10, 12, 14]).reshape(3, 1)
self.lon = np.array([100, 102, 104]).reshape(3, 1)
flat_array = np.array(range(18))
self.value = flat_array.reshape(6, 3, 1)
self.test_dataset = Dataset(self.lat, self.lon, self.time,
self.value, self.variable)
avg = np.ma.array([0.995053, 3.995053, 6.995053, 9.995053, 12.995053,
15.995053])
result = utils.calc_area_weighted_spatial_average(
self.test_dataset, area_weight=True)
np.testing.assert_array_almost_equal(avg, result)
class TestCalcClimatologyYear(unittest.TestCase):
''' Testing function 'calc_climatology_year' from ocw.utils.py '''
def setUp(self):
self.lat = np.array([10, 12, 14, 16, 18])
self.lon = np.array([100, 102, 104, 106, 108])
self.time = np.array(
[datetime.datetime(2000, 1, 1) + relativedelta(months=x)
for x in range(24)])
flat_array = np.array(range(600))
self.value = flat_array.reshape(24, 5, 5)
self.variable = 'prec'
self.test_dataset = Dataset(self.lat, self.lon, self.time,
self.value, self.variable)
def test_annually_mean(self):
annually_mean = np.append(
np.arange(137.5, 162.5, 1), np.arange(437.5, 462.5, 1))
annually_mean.shape = (2, 5, 5)
np.testing.assert_array_equal(
utils.calc_climatology_year(self.test_dataset)[0], annually_mean)
def test_total_mean(self):
total_mean = np.arange(287.5, 312.5, 1)
total_mean.shape = (5, 5)
np.testing.assert_array_equal(
utils.calc_climatology_year(self.test_dataset)[1], total_mean)
def test_invalid_time_shape(self):
flat_array = np.array(range(350))
self.test_dataset.values = flat_array.reshape(14, 5, 5)
with self.assertRaises(ValueError):
utils.calc_climatology_year(self.test_dataset)
class TestCalcClimatologyMonthly(unittest.TestCase):
''' Tests the 'calc_climatology_monthly' method from ocw.utils.py '''
def setUp(self):
self.lats = np.array([10, 20, 30, 40, 50])
self.lons = np.array([20, 30, 40, 50, 60])
start_date = datetime.datetime(2000, 1, 1)
self.times = np.array([start_date + relativedelta(months=x)
for x in range(36)])
self.values = np.array([1] * 300 + [2] * 300 +
[0] * 300).reshape(36, 5, 5)
self.variable = 'testdata'
self.dataset = Dataset(self.lats, self.lons, self.times,
self.values, self.variable)
def test_calc_climatology_monthly(self):
expected_result = np.ones(300).reshape(12, 5, 5)
expected_times = np.array([datetime.datetime(1, 1, 1) +
relativedelta(months=x)
for x in range(12)])
actual_result, actual_times = utils.calc_climatology_monthly(
self.dataset)
np.testing.assert_array_equal(actual_result, expected_result)
np.testing.assert_array_equal(actual_times, expected_times)
def test_invalid_time_shape(self):
flat_array = np.array(range(350))
self.dataset.values = flat_array.reshape(14, 5, 5)
with self.assertRaises(ValueError):
utils.calc_climatology_monthly(self.dataset)
class TestCalcTimeSeries(unittest.TestCase):
''' Tests the 'calc_time_series' method from ocw.utils.py '''
def setUp(self):
self.lats = np.array([10, 20, 30, 40, 50])
self.lons = np.array([20, 30, 40, 50, 60])
start_date = datetime.datetime(2000, 1, 1)
self.times = np.array([start_date + relativedelta(months=x)
for x in range(12)])
self.values = np.ones(300).reshape(12, 5, 5)
self.variable = 'testdata'
self.dataset = Dataset(self.lats, self.lons, self.times,
self.values, self.variable)
def test_calc_time_series(self):
expected_result = np.ones(12)
np.testing.assert_array_equal(
utils.calc_time_series(self.dataset), expected_result)
if __name__ == '__main__':
unittest.main()