Merge branch 'master' of https://github.com/apache/incubator-sdap-nexus into SDAP-40
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index e89a00e..d6466f5 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -178,27 +178,10 @@
for shortName in ds:
the_time = datetime.now()
- daysinrange = self._tile_service.find_days_in_range_asc(bounding_polygon.bounds[1],
- bounding_polygon.bounds[3],
- bounding_polygon.bounds[0],
- bounding_polygon.bounds[2],
- shortName,
- start_seconds_from_epoch,
- end_seconds_from_epoch)
- self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
-
- ndays = len(daysinrange)
- if ndays == 0:
- raise NoDataException(reason="No data found for selected timeframe")
-
- self.log.debug('Found {0} days in range'.format(ndays))
- for i, d in enumerate(daysinrange):
- self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
- spark_nparts_needed = min(spark_nparts, ndays)
-
- the_time = datetime.now()
- results, meta = spark_driver(daysinrange, bounding_polygon, shortName,
- spark_nparts_needed=spark_nparts_needed, sc=self._sc)
+ results, meta = spark_driver(start_seconds_from_epoch,
+ end_seconds_from_epoch,
+ bounding_polygon, shortName,
+ spark_nparts_needed=spark_nparts, sc=self._sc)
self.log.info(
"Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
@@ -487,12 +470,16 @@
return sio.getvalue()
-def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999., spark_nparts_needed=1, sc=None):
+def spark_driver(start_time, end_time, bounding_polygon, ds, fill=-9999., spark_nparts_needed=1, sc=None):
+ # Calculate partial time ranges for each Spark partition.
+ time_divs = np.linspace(start_time, end_time, spark_nparts_needed+1)
+ time_ranges_part = np.array([time_divs[:-1], time_divs[1:]]).T
+
+ # Create array of tuples containing all of the arguments the Spark
+ # mapper needs.
nexus_tiles_spark = [(bounding_polygon.wkt, ds,
- list(daysinrange_part), fill)
- for daysinrange_part
- in np.array_split(daysinrange,
- spark_nparts_needed)]
+ tuple(time_range_part), fill)
+ for time_range_part in time_ranges_part]
# Launch Spark computations
rdd = sc.parallelize(nexus_tiles_spark, spark_nparts_needed)
@@ -505,44 +492,47 @@
def calc_average_on_day(tile_in_spark):
import shapely.wkt
- (bounding_wkt, dataset, timestamps, fill) = tile_in_spark
- if len(timestamps) == 0:
- return []
+ (bounding_wkt, dataset, time_range, fill) = tile_in_spark
tile_service = NexusTileService()
ds1_nexus_tiles = \
tile_service.get_tiles_bounded_by_polygon(shapely.wkt.loads(bounding_wkt),
dataset,
- timestamps[0],
- timestamps[-1],
+ time_range[0],
+ time_range[1],
rows=5000)
+ if len(ds1_nexus_tiles) == 0:
+ return []
+ # Create a dictionary mapping each time stamp to a list of tuples.
+ # Each tuple has 2 elements, the index of a tile that contains the
+ # time stamp, and the index of the time stamp among all the time stamps
+ # contained in that tile.
tile_dict = {}
- for timeinseconds in timestamps:
- tile_dict[timeinseconds] = []
-
for i in range(len(ds1_nexus_tiles)):
tile = ds1_nexus_tiles[i]
- tile_dict[tile.times[0]].append(i)
-
+ for j in range(len(tile.times)):
+ t = tile.times[j]
+ if t not in tile_dict:
+ tile_dict[t] = []
+ tile_dict[t].append((i,j))
+
+ # Create an aggregate array with all the data and associated mask for
+ # each time stamp and an aggregate array with the latitude corresponding
+ # to each data element. Then compute the statistics, weighting each
+ # data element by cos(latitude).
stats_arr = []
- for timeinseconds in timestamps:
+ for timeinseconds in sorted(tile_dict.keys()):
cur_tile_list = tile_dict[timeinseconds]
if len(cur_tile_list) == 0:
continue
tile_data_agg = \
- np.ma.array(data=np.hstack([ds1_nexus_tiles[i].data.data.flatten()
- for i in cur_tile_list
- if (ds1_nexus_tiles[i].times[0] ==
- timeinseconds)]),
- mask=np.hstack([ds1_nexus_tiles[i].data.mask.flatten()
- for i in cur_tile_list
- if (ds1_nexus_tiles[i].times[0] ==
- timeinseconds)]))
+ np.ma.array(data=np.hstack([ds1_nexus_tiles[i].data[j].data.flatten()
+ for i,j in cur_tile_list]),
+ mask=np.hstack([ds1_nexus_tiles[i].data[j].mask.flatten()
+ for i,j in cur_tile_list]))
lats_agg = np.hstack([np.repeat(ds1_nexus_tiles[i].latitudes,
len(ds1_nexus_tiles[i].longitudes))
- for i in cur_tile_list
- if (ds1_nexus_tiles[i].times[0] ==
- timeinseconds)])
+ for i,j in cur_tile_list])
if (len(tile_data_agg) == 0) or tile_data_agg.mask.all():
continue
else: