SDAP-294: Add optional normalizeDates parameter for time series and hofmueller (#114)

diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 7c3041a..f65b25b 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -28,6 +28,7 @@
 
 from webservice.NexusHandler import nexus_handler
 from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark import utils
 from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -159,7 +160,7 @@
         except:
             try:
                 west, south, east, north = request.get_min_lon(), request.get_min_lat(), \
-                                           request.get_max_lon(), request.get_max_lat()
+                    request.get_max_lon(), request.get_max_lat()
                 bounding_polygon = shapely.geometry.Polygon(
                     [(west, south), (east, south), (east, north), (west, north), (west, south)])
             except:
@@ -192,8 +193,9 @@
 
         start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
         end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+        normalize_dates = request.get_normalize_dates()
 
-        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch
+        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, normalize_dates
 
     def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="mean", append=True):
         shape = (len(results), len(results[0][pivot]))
@@ -262,7 +264,7 @@
             'min': t[7]}
 
 
-def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback):
+def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback, normalize_dates):
     # Parallelize list of tile ids
     rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
     if latlon == 0:
@@ -291,34 +293,38 @@
                                                  hof_tuple_combine(x[1],
                                                                    y[1])))
 
+    # The functions create_combiner, merge_value, and merge_combiner are arguments to RDD.combineByKey()
+    def create_combiner(val):
+        time_in_seconds = val[0]
+        if normalize_dates:
+            time_in_seconds = utils.normalize_date(val[0])
+
+        return {
+            'sequence': val[1],
+            'time': time_in_seconds,
+            'iso_time': datetime.utcfromtimestamp(time_in_seconds).strftime(ISO_8601),
+            avg_var_name_collection: [hof_tuple_to_dict(val, avg_var_name)]
+        }
+
+    def merge_value(x, val):
+        return {
+            'sequence': x['sequence'],
+            'time': x['time'],
+            'iso_time': x['iso_time'],
+            avg_var_name_collection: (x[avg_var_name_collection] + [hof_tuple_to_dict(val, avg_var_name)])
+        }
+
+    def merge_combiner(x, y):
+        return {
+            'sequence': x['sequence'],
+            'time': x['time'],
+            'iso_time': x['iso_time'],
+            avg_var_name_collection: (x[avg_var_name_collection] + y[avg_var_name_collection])
+        }
+
     # Convert the tuples to dictionary entries and combine coordinates
     # with the same time stamp.  Here we have input key = (time)
-    results = results.values(). \
-        combineByKey(lambda val, avg_var_name=avg_var_name,
-                            avg_var_name_collection=avg_var_name_collection: {
-        'sequence': val[1],
-        'time': val[0],
-        'iso_time': datetime.utcfromtimestamp(val[0]).strftime(ISO_8601),
-        avg_var_name_collection: [
-            hof_tuple_to_dict(val, avg_var_name)]},
-                     lambda x, val, avg_var_name=avg_var_name,
-                            avg_var_name_collection=avg_var_name_collection: {
-                         'sequence': x['sequence'],
-                         'time': x['time'],
-                         'iso_time': x['iso_time'],
-                         avg_var_name_collection: (
-                                 x[avg_var_name_collection] +
-                                 [hof_tuple_to_dict(val, avg_var_name)])},
-                     lambda x, y,
-                            avg_var_name_collection=avg_var_name_collection:
-                     {'sequence': x['sequence'],
-                      'time': x['time'],
-                      'iso_time': x['iso_time'],
-                      avg_var_name_collection: (
-                              x[avg_var_name_collection] +
-                              y[avg_var_name_collection])}). \
-        values(). \
-        collect()
+    results = results.values().combineByKey(create_combiner, merge_value, merge_combiner).values().collect()
 
     reduce_duration = (datetime.now() - reduce_start).total_seconds()
     metrics_callback(reduce=reduce_duration)
@@ -339,7 +345,7 @@
         BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
 
     def calc(self, compute_options, **args):
-        ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+        ds, bbox, start_time, end_time, normalize_dates = self.parse_arguments(compute_options)
 
         metrics_record = self._create_metrics_record()
         calculation_start = datetime.now()
@@ -360,7 +366,8 @@
                                self._latlon,
                                self._tile_service_factory,
                                nexus_tiles_spark,
-                               metrics_record.record_metrics)
+                               metrics_record.record_metrics,
+                               normalize_dates)
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry['time'])
         for i in range(len(results)):
@@ -394,7 +401,7 @@
         BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
 
     def calc(self, compute_options, **args):
-        ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+        ds, bbox, start_time, end_time, normalize_dates = self.parse_arguments(compute_options)
 
         metrics_record = self._create_metrics_record()
         calculation_start = datetime.now()
@@ -415,7 +422,8 @@
                                self._latlon,
                                self._tile_service_factory,
                                nexus_tiles_spark,
-                               metrics_record.record_metrics)
+                               metrics_record.record_metrics,
+                               normalize_dates)
 
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry["time"])
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 422fdb8..1eeecce 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import calendar
 import itertools
 import logging
 import traceback
@@ -28,12 +27,12 @@
 import shapely.geometry
 import shapely.wkt
 from backports.functools_lru_cache import lru_cache
-from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
 from scipy import stats
 from webservice import Filtering as filtering
 from webservice.NexusHandler import nexus_handler
 from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark import utils
 from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -156,8 +155,9 @@
         end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
 
         nparts_requested = request.get_nparts()
+        normalize_dates = request.get_normalize_dates()
 
-        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested
+        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested, normalize_dates
 
     def calc(self, request, **args):
         """
@@ -167,7 +167,7 @@
         :return:
         """
         start_time = datetime.now()
-        ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested = self.parse_arguments(
+        ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested, normalize_dates = self.parse_arguments(
             request)
         metrics_record = self._create_metrics_record()
 
@@ -199,6 +199,7 @@
                                          shortName,
                                          self._tile_service_factory,
                                          metrics_record.record_metrics,
+                                         normalize_dates,
                                          spark_nparts=spark_nparts,
                                          sc=self._sc)
 
@@ -221,6 +222,7 @@
                                                shortName_clim,
                                                self._tile_service_factory,
                                                metrics_record.record_metrics,
+                                               normalize_dates=False,
                                                spark_nparts=spark_nparts,
                                                sc=self._sc)
                 clim_indexed_by_month = {datetime.utcfromtimestamp(result['time']).month: result for result in results_clim}
@@ -297,73 +299,6 @@
         return res
 
     @lru_cache()
-    def calculate_monthly_average(self, month=None, bounding_polygon_wkt=None, ds=None):
-
-        min_date, max_date = self.get_min_max_date(ds=ds)
-
-        monthly_averages, monthly_counts = [], []
-        monthly_mins, monthly_maxes = [], []
-        bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt)
-        for year in range(min_date.year, max_date.year + 1):
-            beginning_of_month = datetime(year, month, 1)
-            end_of_month = datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59)
-            start = (pytz.UTC.localize(beginning_of_month) - EPOCH).total_seconds()
-            end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds()
-            tile_stats = self._get_tile_service().find_tiles_in_polygon(bounding_polygon, ds, start, end,
-                                                                        fl=('id,'
-                                                                            'tile_avg_val_d,tile_count_i,'
-                                                                            'tile_min_val_d,tile_max_val_d,'
-                                                                            'tile_min_lat,tile_max_lat,'
-                                                                            'tile_min_lon,tile_max_lon'),
-                                                                        fetch_data=False)
-            if len(tile_stats) == 0:
-                continue
-
-            # Split list into tiles on the border of the bounding box and tiles completely inside the bounding box.
-            border_tiles, inner_tiles = [], []
-            for tile in tile_stats:
-                inner_tiles.append(tile) if bounding_polygon.contains(shapely.geometry.box(tile.bbox.min_lon,
-                                                                                           tile.bbox.min_lat,
-                                                                                           tile.bbox.max_lon,
-                                                                                           tile.bbox.max_lat)) else border_tiles.append(
-                    tile)
-
-            # We can use the stats of the inner tiles directly
-            tile_means = [tile.tile_stats.mean for tile in inner_tiles]
-            tile_mins = [tile.tile_stats.min for tile in inner_tiles]
-            tile_maxes = [tile.tile_stats.max for tile in inner_tiles]
-            tile_counts = [tile.tile_stats.count for tile in inner_tiles]
-
-            # Border tiles need have the data loaded, masked, and stats recalculated
-            border_tiles = list(self._get_tile_service().fetch_data_for_tiles(*border_tiles))
-            border_tiles = self._get_tile_service().mask_tiles_to_polygon(bounding_polygon, border_tiles)
-            for tile in border_tiles:
-                tile.update_stats()
-                tile_means.append(tile.tile_stats.mean)
-                tile_mins.append(tile.tile_stats.min)
-                tile_maxes.append(tile.tile_stats.max)
-                tile_counts.append(tile.tile_stats.count)
-
-            tile_means = np.array(tile_means)
-            tile_mins = np.array(tile_mins)
-            tile_maxes = np.array(tile_maxes)
-            tile_counts = np.array(tile_counts)
-
-            sum_tile_counts = np.sum(tile_counts) * 1.0
-
-            monthly_averages += [np.average(tile_means, None, tile_counts / sum_tile_counts).item()]
-            monthly_mins += [np.average(tile_mins, None, tile_counts / sum_tile_counts).item()]
-            monthly_maxes += [np.average(tile_maxes, None, tile_counts / sum_tile_counts).item()]
-            monthly_counts += [sum_tile_counts]
-
-        count_sum = np.sum(monthly_counts) * 1.0
-        weights = np.array(monthly_counts) / count_sum
-
-        return np.average(monthly_averages, None, weights).item(), \
-            np.average(monthly_averages, None, weights).item(), \
-            np.average(monthly_averages, None, weights).item()
-
-    @lru_cache()
     def get_min_max_date(self, ds=None):
         min_date = pytz.timezone('UTC').localize(
             datetime.utcfromtimestamp(self._get_tile_service().get_min_time([], ds=ds)))
@@ -512,7 +447,7 @@
         return sio.getvalue()
 
 
-def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, fill=-9999.,
+def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, normalize_dates, fill=-9999.,
                  spark_nparts=1, sc=None):
     nexus_tiles_spark = [(bounding_polygon.wkt, ds,
                           list(daysinrange_part), fill)
@@ -522,14 +457,14 @@
     # Launch Spark computations
     rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
     metrics_callback(partitions=rdd.getNumPartitions())
-    results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback)).collect()
+    results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback, normalize_dates)).collect()
     results = list(itertools.chain.from_iterable(results))
     results = sorted(results, key=lambda entry: entry["time"])
 
     return results, {}
 
 
-def calc_average_on_day(tile_service_factory, metrics_callback, tile_in_spark):
+def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates, tile_in_spark):
     import shapely.wkt
     from datetime import datetime
     from pytz import timezone
@@ -588,6 +523,9 @@
             data_std = np.ma.std(tile_data_agg)
 
         # Return Stats by day
+        if normalize_dates:
+            timeinseconds = utils.normalize_date(timeinseconds)
+
         stat = {
             'min': data_min,
             'max': data_max,
diff --git a/analysis/webservice/algorithms_spark/utils.py b/analysis/webservice/algorithms_spark/utils.py
new file mode 100644
index 0000000..5556b15
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/utils.py
@@ -0,0 +1,9 @@
+from datetime import datetime
+import pytz
+import calendar
+
+
+def normalize_date(time_in_seconds):
+    dt = datetime.utcfromtimestamp(time_in_seconds)
+    normalized_dt = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.utc)
+    return int(calendar.timegm(normalized_dt.timetuple()))
diff --git a/analysis/webservice/webmodel/NexusRequestObject.py b/analysis/webservice/webmodel/NexusRequestObject.py
index f118484..3779cf9 100644
--- a/analysis/webservice/webmodel/NexusRequestObject.py
+++ b/analysis/webservice/webmodel/NexusRequestObject.py
@@ -224,4 +224,7 @@
         return self.get_argument(RequestParameters.PLOT_TYPE, default=default)
 
     def get_nparts(self):
-        return self.get_int_arg(RequestParameters.NPARTS, 0)
\ No newline at end of file
+        return self.get_int_arg(RequestParameters.NPARTS, 0)
+
+    def get_normalize_dates(self):
+        return self.get_boolean_arg(RequestParameters.NORMALIZE_DATES, False)
diff --git a/analysis/webservice/webmodel/RequestParameters.py b/analysis/webservice/webmodel/RequestParameters.py
index b043cbe..2fdfa29 100644
--- a/analysis/webservice/webmodel/RequestParameters.py
+++ b/analysis/webservice/webmodel/RequestParameters.py
@@ -20,4 +20,5 @@
     PLOT_SERIES = "plotSeries"
     PLOT_TYPE = "plotType"
     NPARTS = "nparts"
-    METADATA_FILTER = "metadataFilter"
\ No newline at end of file
+    METADATA_FILTER = "metadataFilter"
+    NORMALIZE_DATES = "normalizeDates"