blob: 8c18ae99c53a8e8b3e6127440ffdb4a12746bd7f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import math
from itertools import groupby
import numpy as np
from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon
from scipy.stats import linregress
from shapely.geometry import box
from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC
from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
from webservice.webmodel import NexusProcessingException, NexusResults
@nexus_handler
class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler):
name = "Correlation Map"
path = "/correlationMap"
description = "Computes a correlation map between two datasets given an arbitrary geographical area and time range"
params = DEFAULT_PARAMETERS_SPEC
params["res"] = {
"name": "Resolution",
"type": "float",
"description": "The resolution of the resulting correlation map"
}
singleton = True
def calc(self, computeOptions, **args):
minLat = computeOptions.get_min_lat()
maxLat = computeOptions.get_max_lat()
minLon = computeOptions.get_min_lon()
maxLon = computeOptions.get_max_lon()
ds = computeOptions.get_dataset()
startTime = computeOptions.get_start_time()
endTime = computeOptions.get_end_time()
resolution = computeOptions.get_decimal_arg("res", default=1.0)
if not len(ds) == 2:
raise Exception("Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2")
ds1tiles = self._get_tile_service().find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[0], startTime,
endTime)
ds2tiles = self._get_tile_service().find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[1], startTime,
endTime)
matches = self._match_tiles(ds1tiles, ds2tiles)
if len(matches) == 0:
raise NexusProcessingException(reason="Could not find any data temporally co-located")
results = [[{
'cnt': 0,
'slope': 0,
'intercept': 0,
'r': 0,
'p': 0,
'stderr': 0,
'lat': float(lat),
'lon': float(lon)
} for lon in np.arange(minLon, maxLon, resolution)] for lat in
np.arange(minLat, maxLat, resolution)]
for stats in results:
for stat in stats:
values_x = []
values_y = []
for tile_matches in matches:
tile_1_list = tile_matches[0]
value_1 = get_approximate_value_for_lat_lon(tile_1_list, stat["lat"], stat["lon"])
tile_2_list = tile_matches[1]
value_2 = get_approximate_value_for_lat_lon(tile_2_list, stat["lat"], stat["lon"])
if not (math.isnan(value_1) or math.isnan(value_2)):
values_x.append(value_1)
values_y.append(value_2)
if len(values_x) > 2 and len(values_y) > 2:
stats = linregress(values_x, values_y)
stat["slope"] = stats[0] if not math.isnan(stats[0]) and not math.isinf(stats[0]) else str(stats[0])
stat["intercept"] = stats[1] if not math.isnan(stats[1]) and not math.isinf(stats[1]) else str(
stats[1])
stat["r"] = stats[2] if not math.isnan(stats[2]) and not math.isinf(stats[2]) else str(stats[2])
stat["p"] = stats[3] if not math.isnan(stats[3]) and not math.isinf(stats[3]) else str(stats[3])
stat["stderr"] = stats[4] if not math.isnan(stats[4]) and not math.isinf(stats[4]) else str(
stats[4])
stat["cnt"] = len(values_x)
return CorrelationResults(results)
def _match_tiles(self, tiles_1, tiles_2):
date_map = {}
tiles_1 = sorted(tiles_1, key=lambda tile: tile.min_time)
for tile_start_time, tiles in groupby(tiles_1, lambda tile: tile.min_time):
if not tile_start_time in date_map:
date_map[tile_start_time] = []
date_map[tile_start_time].append(list(tiles))
tiles_2 = sorted(tiles_2, key=lambda tile: tile.min_time)
for tile_start_time, tiles in groupby(tiles_2, lambda tile: tile.min_time):
if not tile_start_time in date_map:
date_map[tile_start_time] = []
date_map[tile_start_time].append(list(tiles))
matches = [tiles for a_time, tiles in date_map.items() if len(tiles) == 2]
return matches
class CorrelationResults(NexusResults):
def __init__(self, results):
NexusResults.__init__(self)
self.results = results
def toJson(self):
json_d = {
"stats": {},
"meta": [None, None],
"data": self.results
}
return json.dumps(json_d, indent=4)