SDAP-294: Add optional normalizeDates parameter for time series and hofmueller (#114)
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 7c3041a..f65b25b 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -28,6 +28,7 @@
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark import utils
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -159,7 +160,7 @@
except:
try:
west, south, east, north = request.get_min_lon(), request.get_min_lat(), \
- request.get_max_lon(), request.get_max_lat()
+ request.get_max_lon(), request.get_max_lat()
bounding_polygon = shapely.geometry.Polygon(
[(west, south), (east, south), (east, north), (west, north), (west, south)])
except:
@@ -192,8 +193,9 @@
start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+ normalize_dates = request.get_normalize_dates()
- return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch
+ return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, normalize_dates
def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="mean", append=True):
shape = (len(results), len(results[0][pivot]))
@@ -262,7 +264,7 @@
'min': t[7]}
-def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback):
+def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback, normalize_dates):
# Parallelize list of tile ids
rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
if latlon == 0:
@@ -291,34 +293,38 @@
hof_tuple_combine(x[1],
y[1])))
+ # The functions create_combiner, merge_value, and merge_combiner are arguments to RDD.combineByKey()
+ def create_combiner(val):
+ time_in_seconds = val[0]
+ if normalize_dates:
+ time_in_seconds = utils.normalize_date(val[0])
+
+ return {
+ 'sequence': val[1],
+ 'time': time_in_seconds,
+ 'iso_time': datetime.utcfromtimestamp(time_in_seconds).strftime(ISO_8601),
+ avg_var_name_collection: [hof_tuple_to_dict(val, avg_var_name)]
+ }
+
+ def merge_value(x, val):
+ return {
+ 'sequence': x['sequence'],
+ 'time': x['time'],
+ 'iso_time': x['iso_time'],
+ avg_var_name_collection: (x[avg_var_name_collection] + [hof_tuple_to_dict(val, avg_var_name)])
+ }
+
+ def merge_combiner(x, y):
+ return {
+ 'sequence': x['sequence'],
+ 'time': x['time'],
+ 'iso_time': x['iso_time'],
+ avg_var_name_collection: (x[avg_var_name_collection] + y[avg_var_name_collection])
+ }
+
# Convert the tuples to dictionary entries and combine coordinates
# with the same time stamp. Here we have input key = (time)
- results = results.values(). \
- combineByKey(lambda val, avg_var_name=avg_var_name,
- avg_var_name_collection=avg_var_name_collection: {
- 'sequence': val[1],
- 'time': val[0],
- 'iso_time': datetime.utcfromtimestamp(val[0]).strftime(ISO_8601),
- avg_var_name_collection: [
- hof_tuple_to_dict(val, avg_var_name)]},
- lambda x, val, avg_var_name=avg_var_name,
- avg_var_name_collection=avg_var_name_collection: {
- 'sequence': x['sequence'],
- 'time': x['time'],
- 'iso_time': x['iso_time'],
- avg_var_name_collection: (
- x[avg_var_name_collection] +
- [hof_tuple_to_dict(val, avg_var_name)])},
- lambda x, y,
- avg_var_name_collection=avg_var_name_collection:
- {'sequence': x['sequence'],
- 'time': x['time'],
- 'iso_time': x['iso_time'],
- avg_var_name_collection: (
- x[avg_var_name_collection] +
- y[avg_var_name_collection])}). \
- values(). \
- collect()
+ results = results.values().combineByKey(create_combiner, merge_value, merge_combiner).values().collect()
reduce_duration = (datetime.now() - reduce_start).total_seconds()
metrics_callback(reduce=reduce_duration)
@@ -339,7 +345,7 @@
BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
def calc(self, compute_options, **args):
- ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+ ds, bbox, start_time, end_time, normalize_dates = self.parse_arguments(compute_options)
metrics_record = self._create_metrics_record()
calculation_start = datetime.now()
@@ -360,7 +366,8 @@
self._latlon,
self._tile_service_factory,
nexus_tiles_spark,
- metrics_record.record_metrics)
+ metrics_record.record_metrics,
+ normalize_dates)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry['time'])
for i in range(len(results)):
@@ -394,7 +401,7 @@
BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
def calc(self, compute_options, **args):
- ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+ ds, bbox, start_time, end_time, normalize_dates = self.parse_arguments(compute_options)
metrics_record = self._create_metrics_record()
calculation_start = datetime.now()
@@ -415,7 +422,8 @@
self._latlon,
self._tile_service_factory,
nexus_tiles_spark,
- metrics_record.record_metrics)
+ metrics_record.record_metrics,
+ normalize_dates)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry["time"])
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 422fdb8..1eeecce 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import calendar
import itertools
import logging
import traceback
@@ -28,12 +27,12 @@
import shapely.geometry
import shapely.wkt
from backports.functools_lru_cache import lru_cache
-from nexustiles.nexustiles import NexusTileService
from pytz import timezone
from scipy import stats
from webservice import Filtering as filtering
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark import utils
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -156,8 +155,9 @@
end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
nparts_requested = request.get_nparts()
+ normalize_dates = request.get_normalize_dates()
- return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested
+ return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested, normalize_dates
def calc(self, request, **args):
"""
@@ -167,7 +167,7 @@
:return:
"""
start_time = datetime.now()
- ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested = self.parse_arguments(
+ ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested, normalize_dates = self.parse_arguments(
request)
metrics_record = self._create_metrics_record()
@@ -199,6 +199,7 @@
shortName,
self._tile_service_factory,
metrics_record.record_metrics,
+ normalize_dates,
spark_nparts=spark_nparts,
sc=self._sc)
@@ -221,6 +222,7 @@
shortName_clim,
self._tile_service_factory,
metrics_record.record_metrics,
+ normalize_dates=False,
spark_nparts=spark_nparts,
sc=self._sc)
clim_indexed_by_month = {datetime.utcfromtimestamp(result['time']).month: result for result in results_clim}
@@ -297,73 +299,6 @@
return res
@lru_cache()
- def calculate_monthly_average(self, month=None, bounding_polygon_wkt=None, ds=None):
-
- min_date, max_date = self.get_min_max_date(ds=ds)
-
- monthly_averages, monthly_counts = [], []
- monthly_mins, monthly_maxes = [], []
- bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt)
- for year in range(min_date.year, max_date.year + 1):
- beginning_of_month = datetime(year, month, 1)
- end_of_month = datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59)
- start = (pytz.UTC.localize(beginning_of_month) - EPOCH).total_seconds()
- end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds()
- tile_stats = self._get_tile_service().find_tiles_in_polygon(bounding_polygon, ds, start, end,
- fl=('id,'
- 'tile_avg_val_d,tile_count_i,'
- 'tile_min_val_d,tile_max_val_d,'
- 'tile_min_lat,tile_max_lat,'
- 'tile_min_lon,tile_max_lon'),
- fetch_data=False)
- if len(tile_stats) == 0:
- continue
-
- # Split list into tiles on the border of the bounding box and tiles completely inside the bounding box.
- border_tiles, inner_tiles = [], []
- for tile in tile_stats:
- inner_tiles.append(tile) if bounding_polygon.contains(shapely.geometry.box(tile.bbox.min_lon,
- tile.bbox.min_lat,
- tile.bbox.max_lon,
- tile.bbox.max_lat)) else border_tiles.append(
- tile)
-
- # We can use the stats of the inner tiles directly
- tile_means = [tile.tile_stats.mean for tile in inner_tiles]
- tile_mins = [tile.tile_stats.min for tile in inner_tiles]
- tile_maxes = [tile.tile_stats.max for tile in inner_tiles]
- tile_counts = [tile.tile_stats.count for tile in inner_tiles]
-
- # Border tiles need have the data loaded, masked, and stats recalculated
- border_tiles = list(self._get_tile_service().fetch_data_for_tiles(*border_tiles))
- border_tiles = self._get_tile_service().mask_tiles_to_polygon(bounding_polygon, border_tiles)
- for tile in border_tiles:
- tile.update_stats()
- tile_means.append(tile.tile_stats.mean)
- tile_mins.append(tile.tile_stats.min)
- tile_maxes.append(tile.tile_stats.max)
- tile_counts.append(tile.tile_stats.count)
-
- tile_means = np.array(tile_means)
- tile_mins = np.array(tile_mins)
- tile_maxes = np.array(tile_maxes)
- tile_counts = np.array(tile_counts)
-
- sum_tile_counts = np.sum(tile_counts) * 1.0
-
- monthly_averages += [np.average(tile_means, None, tile_counts / sum_tile_counts).item()]
- monthly_mins += [np.average(tile_mins, None, tile_counts / sum_tile_counts).item()]
- monthly_maxes += [np.average(tile_maxes, None, tile_counts / sum_tile_counts).item()]
- monthly_counts += [sum_tile_counts]
-
- count_sum = np.sum(monthly_counts) * 1.0
- weights = np.array(monthly_counts) / count_sum
-
- return np.average(monthly_averages, None, weights).item(), \
- np.average(monthly_averages, None, weights).item(), \
- np.average(monthly_averages, None, weights).item()
-
- @lru_cache()
def get_min_max_date(self, ds=None):
min_date = pytz.timezone('UTC').localize(
datetime.utcfromtimestamp(self._get_tile_service().get_min_time([], ds=ds)))
@@ -512,7 +447,7 @@
return sio.getvalue()
-def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, fill=-9999.,
+def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, normalize_dates, fill=-9999.,
spark_nparts=1, sc=None):
nexus_tiles_spark = [(bounding_polygon.wkt, ds,
list(daysinrange_part), fill)
@@ -522,14 +457,14 @@
# Launch Spark computations
rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
metrics_callback(partitions=rdd.getNumPartitions())
- results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback)).collect()
+ results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback, normalize_dates)).collect()
results = list(itertools.chain.from_iterable(results))
results = sorted(results, key=lambda entry: entry["time"])
return results, {}
-def calc_average_on_day(tile_service_factory, metrics_callback, tile_in_spark):
+def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates, tile_in_spark):
import shapely.wkt
from datetime import datetime
from pytz import timezone
@@ -588,6 +523,9 @@
data_std = np.ma.std(tile_data_agg)
# Return Stats by day
+ if normalize_dates:
+ timeinseconds = utils.normalize_date(timeinseconds)
+
stat = {
'min': data_min,
'max': data_max,
diff --git a/analysis/webservice/algorithms_spark/utils.py b/analysis/webservice/algorithms_spark/utils.py
new file mode 100644
index 0000000..5556b15
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/utils.py
@@ -0,0 +1,9 @@
+from datetime import datetime
+import pytz
+import calendar
+
+
+def normalize_date(time_in_seconds):
+ dt = datetime.utcfromtimestamp(time_in_seconds)
+ normalized_dt = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.utc)
+ return int(calendar.timegm(normalized_dt.timetuple()))
diff --git a/analysis/webservice/webmodel/NexusRequestObject.py b/analysis/webservice/webmodel/NexusRequestObject.py
index f118484..3779cf9 100644
--- a/analysis/webservice/webmodel/NexusRequestObject.py
+++ b/analysis/webservice/webmodel/NexusRequestObject.py
@@ -224,4 +224,7 @@
return self.get_argument(RequestParameters.PLOT_TYPE, default=default)
def get_nparts(self):
- return self.get_int_arg(RequestParameters.NPARTS, 0)
\ No newline at end of file
+ return self.get_int_arg(RequestParameters.NPARTS, 0)
+
+ def get_normalize_dates(self):
+ return self.get_boolean_arg(RequestParameters.NORMALIZE_DATES, False)
diff --git a/analysis/webservice/webmodel/RequestParameters.py b/analysis/webservice/webmodel/RequestParameters.py
index b043cbe..2fdfa29 100644
--- a/analysis/webservice/webmodel/RequestParameters.py
+++ b/analysis/webservice/webmodel/RequestParameters.py
@@ -20,4 +20,5 @@
PLOT_SERIES = "plotSeries"
PLOT_TYPE = "plotType"
NPARTS = "nparts"
- METADATA_FILTER = "metadataFilter"
\ No newline at end of file
+ METADATA_FILTER = "metadataFilter"
+ NORMALIZE_DATES = "normalizeDates"