Merge branch 'master' of https://github.com/apache/incubator-sdap-nexus into SDAP-40
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index e89a00e..d6466f5 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -178,27 +178,10 @@
         for shortName in ds:
 
             the_time = datetime.now()
-            daysinrange = self._tile_service.find_days_in_range_asc(bounding_polygon.bounds[1],
-                                                                    bounding_polygon.bounds[3],
-                                                                    bounding_polygon.bounds[0],
-                                                                    bounding_polygon.bounds[2],
-                                                                    shortName,
-                                                                    start_seconds_from_epoch,
-                                                                    end_seconds_from_epoch)
-            self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
-
-            ndays = len(daysinrange)
-            if ndays == 0:
-                raise NoDataException(reason="No data found for selected timeframe")
-
-            self.log.debug('Found {0} days in range'.format(ndays))
-            for i, d in enumerate(daysinrange):
-                self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
-            spark_nparts_needed = min(spark_nparts, ndays)
-
-            the_time = datetime.now()
-            results, meta = spark_driver(daysinrange, bounding_polygon, shortName,
-                                         spark_nparts_needed=spark_nparts_needed, sc=self._sc)
+            results, meta = spark_driver(start_seconds_from_epoch,
+                                         end_seconds_from_epoch, 
+                                         bounding_polygon, shortName,
+                                         spark_nparts_needed=spark_nparts, sc=self._sc)
             self.log.info(
                 "Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
 
@@ -487,12 +470,16 @@
         return sio.getvalue()
 
 
-def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999., spark_nparts_needed=1, sc=None):
+def spark_driver(start_time, end_time, bounding_polygon, ds, fill=-9999., spark_nparts_needed=1, sc=None):
+    # Calculate partial time ranges for each Spark partition.
+    time_divs = np.linspace(start_time, end_time, spark_nparts_needed+1)
+    time_ranges_part = np.array([time_divs[:-1], time_divs[1:]]).T
+
+    # Create array of tuples containing all of the arguments the Spark 
+    # mapper needs.
     nexus_tiles_spark = [(bounding_polygon.wkt, ds,
-                          list(daysinrange_part), fill)
-                         for daysinrange_part
-                         in np.array_split(daysinrange,
-                                           spark_nparts_needed)]
+                          tuple(time_range_part), fill)
+                         for time_range_part in time_ranges_part]
 
     # Launch Spark computations
     rdd = sc.parallelize(nexus_tiles_spark, spark_nparts_needed)
@@ -505,44 +492,47 @@
 
 def calc_average_on_day(tile_in_spark):
     import shapely.wkt
-    (bounding_wkt, dataset, timestamps, fill) = tile_in_spark
-    if len(timestamps) == 0:
-        return []
+    (bounding_wkt, dataset, time_range, fill) = tile_in_spark
     tile_service = NexusTileService()
     ds1_nexus_tiles = \
         tile_service.get_tiles_bounded_by_polygon(shapely.wkt.loads(bounding_wkt),
                                                   dataset,
-                                                  timestamps[0],
-                                                  timestamps[-1],
+                                                  time_range[0],
+                                                  time_range[1],
                                                   rows=5000)
+    if len(ds1_nexus_tiles) == 0:
+        return []
 
+    # Create a dictionary mapping each time stamp to a list of tuples.
+    # Each tuple has 2 elements, the index of a tile that contains the 
+    # time stamp, and the index of the time stamp among all the time stamps
+    # contained in that tile.
     tile_dict = {}
-    for timeinseconds in timestamps:
-        tile_dict[timeinseconds] = []
-
     for i in range(len(ds1_nexus_tiles)):
         tile = ds1_nexus_tiles[i]
-        tile_dict[tile.times[0]].append(i)
-
+        for j in range(len(tile.times)):
+            t = tile.times[j]
+            if t not in tile_dict:
+                tile_dict[t] = []
+            tile_dict[t].append((i,j))
+                
+    # Create an aggregate array with all the data and associated mask for 
+    # each time stamp and an aggregate array with the latitude corresponding
+    # to each data element.  Then compute the statistics, weighting each
+    # data element by cos(latitude).
     stats_arr = []
-    for timeinseconds in timestamps:
+    for timeinseconds in sorted(tile_dict.keys()):
         cur_tile_list = tile_dict[timeinseconds]
         if len(cur_tile_list) == 0:
             continue
         tile_data_agg = \
-            np.ma.array(data=np.hstack([ds1_nexus_tiles[i].data.data.flatten()
-                                        for i in cur_tile_list
-                                        if (ds1_nexus_tiles[i].times[0] ==
-                                            timeinseconds)]),
-                        mask=np.hstack([ds1_nexus_tiles[i].data.mask.flatten()
-                                        for i in cur_tile_list
-                                        if (ds1_nexus_tiles[i].times[0] ==
-                                            timeinseconds)]))
+            np.ma.array(data=np.hstack([ds1_nexus_tiles[i].data[j].data.flatten()
+                                        for i,j in cur_tile_list]),
+                        mask=np.hstack([ds1_nexus_tiles[i].data[j].mask.flatten()
+                                        for i,j in cur_tile_list]))
         lats_agg = np.hstack([np.repeat(ds1_nexus_tiles[i].latitudes,
                                         len(ds1_nexus_tiles[i].longitudes))
-                              for i in cur_tile_list
-                              if (ds1_nexus_tiles[i].times[0] ==
-                                  timeinseconds)])
+                              for i,j in cur_tile_list])
         if (len(tile_data_agg) == 0) or tile_data_agg.mask.all():
             continue
         else: