blob: 4d2c4fe8dfe4eaf5edc38e621baee2bd79eefe91 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from datetime import datetime
from functools import partial
import numpy as np
from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusProcessingException, NexusResults, NoDataException
@nexus_handler
class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
name = "Correlation Map Spark"
path = "/corrMapSpark"
description = "Computes a correlation map between two datasets given an arbitrary geographical area and time range"
params = DEFAULT_PARAMETERS_SPEC
@staticmethod
def _map(tile_service_factory, tile_in):
# Unpack input
tile_bounds, start_time, end_time, ds = tile_in
(min_lat, max_lat, min_lon, max_lon,
min_y, max_y, min_x, max_x) = tile_bounds
# Create arrays to hold intermediate results during
# correlation coefficient calculation.
tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
sumx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
sumy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
sumxx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
sumyy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
sumxy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
n_tile = np.zeros(tile_inbounds_shape, dtype=np.uint32)
# Can only retrieve some number of days worth of data from Solr
# at a time. Set desired value here.
days_at_a_time = 90
# days_at_a_time = 30
# days_at_a_time = 7
# days_at_a_time = 1
# print 'days_at_a_time = ', days_at_a_time
t_incr = 86400 * days_at_a_time
tile_service = tile_service_factory
# Compute the intermediate summations needed for the Pearson
# Correlation Coefficient. We use a one-pass online algorithm
# so that not all of the data needs to be kept in memory all at once.
t_start = start_time
while t_start <= end_time:
t_end = min(t_start + t_incr, end_time)
# t1 = time()
# print 'nexus call start at time %f' % t1
# sys.stdout.flush()
ds1tiles = tile_service.get_tiles_bounded_by_box(min_lat,
max_lat,
min_lon,
max_lon,
ds[0],
t_start,
t_end)
ds2tiles = tile_service.get_tiles_bounded_by_box(min_lat,
max_lat,
min_lon,
max_lon,
ds[1],
t_start,
t_end)
# t2 = time()
# print 'nexus call end at time %f' % t2
# print 'secs in nexus call: ', t2-t1
# sys.stdout.flush()
len1 = len(ds1tiles)
len2 = len(ds2tiles)
# print 't %d to %d - Got %d and %d tiles' % (t_start, t_end,
# len1, len2)
# sys.stdout.flush()
i1 = 0
i2 = 0
time1 = 0
time2 = 0
while i1 < len1 and i2 < len2:
tile1 = ds1tiles[i1]
tile2 = ds2tiles[i2]
# print 'tile1.data = ',tile1.data
# print 'tile2.data = ',tile2.data
# print 'i1, i2, t1, t2 times: ', i1, i2, tile1.times[0], tile2.times[0]
assert tile1.times[0] >= time1, 'DS1 time out of order!'
assert tile2.times[0] >= time2, 'DS2 time out of order!'
time1 = tile1.times[0]
time2 = tile2.times[0]
# print 'i1=%d,i2=%d,time1=%d,time2=%d'%(i1,i2,time1,time2)
if time1 < time2:
i1 += 1
continue
elif time2 < time1:
i2 += 1
continue
assert (time1 == time2), \
"Mismatched tile times %d and %d" % (time1, time2)
# print 'processing time:',time1,time2
t1_data = tile1.data.data
t1_mask = tile1.data.mask
t2_data = tile2.data.data
t2_mask = tile2.data.mask
t1_data = np.nan_to_num(t1_data)
t2_data = np.nan_to_num(t2_data)
joint_mask = ((~t1_mask).astype(np.uint8) *
(~t2_mask).astype(np.uint8))
# print 'joint_mask=',joint_mask
sumx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
# print 'sumx_tile=',sumx_tile
sumy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
# print 'sumy_tile=',sumy_tile
sumxx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
# print 'sumxx_tile=',sumxx_tile
sumyy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
# print 'sumyy_tile=',sumyy_tile
sumxy_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
# print 'sumxy_tile=',sumxy_tile
n_tile += joint_mask[0, min_y:max_y + 1, min_x:max_x + 1]
# print 'n_tile=',n_tile
i1 += 1
i2 += 1
t_start = t_end + 1
# print 'Finished tile', tile_bounds
# sys.stdout.flush()
return ((min_lat, max_lat, min_lon, max_lon), (sumx_tile, sumy_tile,
sumxx_tile, sumyy_tile,
sumxy_tile, n_tile))
def calc(self, computeOptions, **args):
self._setQueryParams(computeOptions.get_dataset(),
(float(computeOptions.get_min_lat()),
float(computeOptions.get_max_lat()),
float(computeOptions.get_min_lon()),
float(computeOptions.get_max_lon())),
computeOptions.get_start_time(),
computeOptions.get_end_time())
nparts_requested = computeOptions.get_nparts()
self.log.debug('ds = {0}'.format(self._ds))
if not len(self._ds) == 2:
raise NexusProcessingException(
reason="Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2",
code=400)
if next(iter([clim for clim in self._ds if 'CLIM' in clim]), False):
raise NexusProcessingException(reason="Cannot compute correlation on a climatology", code=400)
nexus_tiles = self._find_global_tile_set()
# print 'tiles:'
# for tile in nexus_tiles:
# print tile.granule
# print tile.section_spec
# print 'lat:', tile.latitudes
# print 'lon:', tile.longitudes
# nexus_tiles)
if len(nexus_tiles) == 0:
raise NoDataException(reason="No data found for selected timeframe")
self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
daysinrange = self._get_tile_service().find_days_in_range_asc(self._minLat,
self._maxLat,
self._minLon,
self._maxLon,
self._ds[0],
self._startTime,
self._endTime)
ndays = len(daysinrange)
if ndays == 0:
raise NoDataException(reason="No data found for selected timeframe")
self.log.debug('Found {0} days in range'.format(ndays))
for i, d in enumerate(daysinrange):
self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
# Create array of tuples to pass to Spark map function
nexus_tiles_spark = [[self._find_tile_bounds(t),
self._startTime, self._endTime,
self._ds] for t in nexus_tiles]
# Remove empty tiles (should have bounds set to None)
bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
for i in np.flipud(bad_tile_inds):
del nexus_tiles_spark[i]
# Expand Spark map tuple array by duplicating each entry N times,
# where N is the number of ways we want the time dimension carved up.
# Set the time boundaries for each of the Spark map tuples so that
# every Nth element in the array gets the same time bounds.
max_time_parts = 72
num_time_parts = min(max_time_parts, ndays)
spark_part_time_ranges = np.tile(
np.array([a[[0, -1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]),
(len(nexus_tiles_spark), 1))
nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
# Launch Spark computations
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
sum_tiles_part = rdd.map(partial(self._map, self._tile_service_factory))
# print "sum_tiles_part = ",sum_tiles_part.collect()
sum_tiles = \
sum_tiles_part.combineByKey(lambda val: val,
lambda x, val: (x[0] + val[0],
x[1] + val[1],
x[2] + val[2],
x[3] + val[3],
x[4] + val[4],
x[5] + val[5]),
lambda x, y: (x[0] + y[0],
x[1] + y[1],
x[2] + y[2],
x[3] + y[3],
x[4] + y[4],
x[5] + y[5]))
# Convert the N (pixel-wise count) array for each tile to be a
# NumPy masked array. That is the last array in the tuple of
# intermediate summation arrays. Set mask to True if count is 0.
sum_tiles = \
sum_tiles.map(lambda (bounds, (sum_x, sum_y, sum_xx,
sum_yy, sum_xy, n)):
(bounds, (sum_x, sum_y, sum_xx, sum_yy, sum_xy,
np.ma.array(n,
mask=~(n.astype(bool))))))
# print 'sum_tiles = ',sum_tiles.collect()
# For each pixel in each tile compute an array of Pearson
# correlation coefficients. The map function is called once
# per tile. The result of this map operation is a list of 3-tuples of
# (bounds, r, n) for each tile (r=Pearson correlation coefficient
# and n=number of input values that went into each pixel with
# any masked values not included).
corr_tiles = \
sum_tiles.map(lambda (bounds, (sum_x, sum_y, sum_xx, sum_yy,
sum_xy, n)):
(bounds,
np.ma.array(((sum_xy - sum_x * sum_y / n) /
np.sqrt((sum_xx - sum_x * sum_x / n) *
(sum_yy - sum_y * sum_y / n))),
mask=~(n.astype(bool))),
n)).collect()
r = np.zeros((self._nlats, self._nlons), dtype=np.float64, order='C')
n = np.zeros((self._nlats, self._nlons), dtype=np.uint32, order='C')
# The tiles below are NOT Nexus objects. They are tuples
# with the following for each correlation map subset:
# (1) lat-lon bounding box, (2) array of correlation r values,
# and (3) array of count n values.
for tile in corr_tiles:
((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
tile_data, tile_cnt) = tile
y0 = self._lat2ind(tile_min_lat)
y1 = self._lat2ind(tile_max_lat)
x0 = self._lon2ind(tile_min_lon)
x1 = self._lon2ind(tile_max_lon)
self.log.debug(
'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat, tile_max_lat,
tile_min_lon, tile_max_lon,
y0, y1, x0, x1))
r[y0:y1 + 1, x0:x1 + 1] = tile_data
n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
# Store global map in a NetCDF file.
self._create_nc_file(r, 'corrmap.nc', 'r')
# Create dict for JSON response
results = [[{'r': r[y, x], 'cnt': int(n[y, x]),
'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
for x in range(r.shape[1])] for y in range(r.shape[0])]
return CorrelationResults(results)
class CorrelationResults(NexusResults):
def __init__(self, results):
NexusResults.__init__(self)
self.results = results
def toJson(self):
json_d = {
"stats": {},
"meta": [None, None],
"data": self.results
}
return json.dumps(json_d, indent=4)