blob: a49f4b96c5ebed9f54de07c67e3a0e60d0c88b4c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import uuid
from datetime import datetime
import numpy as np
import utm
from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon
from scipy import spatial
from . import BaseDomsHandler
from . import ResultsStorage
from . import datafetch
from . import fetchedgeimpl
from . import geo
from . import workerthread
from webservice.NexusHandler import nexus_handler
@nexus_handler
class CombinedDomsMatchupQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
name = "Experimental Combined DOMS In-Situ Matchup"
path = "/domsmatchup"
description = ""
params = {}
singleton = True
def __init__(self):
BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
def fetchData(self, endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms):
boundsConstrainer = geo.BoundsConstrainer(asString=bbox)
threads = []
for endpoint in endpoints:
thread = workerthread.WorkerThread(datafetch.fetchData,
params=(endpoint, startTime, endTime, bbox, depth_min, depth_max))
threads.append(thread)
workerthread.wait(threads, startFirst=True, poll=0.01)
data2 = []
for thread in threads:
data, bounds = thread.results
data2 += data
boundsConstrainer.testOtherConstrainer(bounds)
return data2, boundsConstrainer
def __parseDatetime(self, dtString):
dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
epoch = datetime.utcfromtimestamp(0)
time = (dt - epoch).total_seconds() * 1000.0
return time
def calc(self, computeOptions, **args):
primary = computeOptions.get_argument("primary", None)
matchup = computeOptions.get_argument("matchup", None)
startTime = computeOptions.get_argument("s", None)
endTime = computeOptions.get_argument("e", None)
bbox = computeOptions.get_argument("b", None)
timeTolerance = computeOptions.get_float_arg("tt")
depth_min = computeOptions.get_float_arg("depthMin", default=None)
depth_max = computeOptions.get_float_arg("depthMax", default=None)
radiusTolerance = computeOptions.get_float_arg("rt")
platforms = computeOptions.get_argument("platforms", None)
if primary is None or len(primary) == 0:
raise Exception("No primary dataset specified")
if matchup is None or len(matchup) == 0:
raise Exception("No matchup datasets specified")
start = self._now()
primarySpec = self.getDataSourceByName(primary)
if primarySpec is None:
raise Exception("Specified primary dataset not found using identifier '%s'" % primary)
primaryData, bounds = self.fetchData([primarySpec], startTime, endTime, bbox, depth_min, depth_max, platforms)
primaryContext = MatchupContext(primaryData)
matchupIds = matchup.split(",")
for matchupId in matchupIds:
matchupSpec = self.getDataSourceByName(matchupId)
if matchupSpec is not None: # Then it's in the in-situ configuration
proc = InsituDatasetProcessor(primaryContext, matchupSpec, startTime, endTime, bbox, depth_min,
depth_max,
platforms, timeTolerance, radiusTolerance)
proc.start()
else: # We assume it to be a Nexus tiled dataset
'''
Single Threaded at the moment...
'''
daysinrange = self._get_tile_service().find_days_in_range_asc(bounds.south, bounds.north, bounds.west,
bounds.east, matchupId,
self.__parseDatetime(startTime) / 1000,
self.__parseDatetime(endTime) / 1000)
tilesByDay = {}
for dayTimestamp in daysinrange:
ds1_nexus_tiles = self._get_tile_service().get_tiles_bounded_by_box_at_time(bounds.south, bounds.north,
bounds.west, bounds.east,
matchupId, dayTimestamp)
# print "***", type(ds1_nexus_tiles)
# print ds1_nexus_tiles[0].__dict__
tilesByDay[dayTimestamp] = ds1_nexus_tiles
primaryContext.processGridded(tilesByDay, matchupId, radiusTolerance, timeTolerance)
matches, numMatches = primaryContext.getFinal(len(matchupIds))
end = self._now()
args = {
"primary": primary,
"matchup": matchupIds,
"startTime": startTime,
"endTime": endTime,
"bbox": bbox,
"timeTolerance": timeTolerance,
"depthMin": depth_min,
"depthMax": depth_max,
"radiusTolerance": radiusTolerance,
"platforms": platforms
}
details = {
"timeToComplete": (end - start),
"numInSituRecords": primaryContext.insituCount,
"numInSituMatched": primaryContext.insituMatches,
"numGriddedChecked": primaryContext.griddedCount,
"numGriddedMatched": primaryContext.griddedMatched
}
with ResultsStorage.ResultsStorage() as resultsStorage:
execution_id = resultsStorage.insertResults(results=matches, params=args, stats=details, startTime=start,
completeTime=end, userEmail="")
return BaseDomsHandler.DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
computeOptions=None, executionId=execution_id)
class MatchupContextMap:
def __init__(self):
pass
def add(self, context):
pass
def delete(self, context):
pass
class MatchupContext:
def __init__(self, primaryData):
self.id = str(uuid.uuid4())
self.griddedCount = 0
self.griddedMatched = 0
self.insituCount = len(primaryData)
self.insituMatches = 0
self.primary = primaryData
for r in self.primary:
r["matches"] = []
self.data = []
for s in primaryData:
u = utm.from_latlon(s["y"], s["x"])
v = (u[0], u[1], 0.0)
self.data.append(v)
if len(self.data) > 0:
self.tree = spatial.KDTree(self.data)
else:
self.tree = None
def getFinal(self, minMatchesToInclude):
matched = []
ttlMatches = 0
for m in self.primary:
if len(m["matches"]) >= minMatchesToInclude:
matched.append(m)
ttlMatches += len(m["matches"])
return matched, ttlMatches
def processGridded(self, tilesByDay, source, xyTolerance, timeTolerance):
for r in self.primary:
foundSatNodes = self.__getSatNodeForLatLonAndTime(tilesByDay, source, r["y"], r["x"], r["time"],
xyTolerance)
self.griddedCount += 1
self.griddedMatched += len(foundSatNodes)
r["matches"].extend(foundSatNodes)
def processInSitu(self, records, xyTolerance, timeTolerance):
if self.tree is not None:
for s in records:
self.insituCount += 1
u = utm.from_latlon(s["y"], s["x"])
coords = np.array([u[0], u[1], 0])
ball = self.tree.query_ball_point(coords, xyTolerance)
self.insituMatches += len(ball)
for i in ball:
match = self.primary[i]
if abs(match["time"] - s["time"]) <= (timeTolerance * 1000.0):
match["matches"].append(s)
def __getValueForLatLon(self, chunks, lat, lon, arrayName="data"):
value = get_approximate_value_for_lat_lon(chunks, lat, lon, arrayName)
return value
def __checkNumber(self, value):
if isinstance(value, float) and (math.isnan(value) or value == np.nan):
value = None
elif value is not None:
value = float(value)
return value
def __buildSwathIndexes(self, chunk):
latlons = []
utms = []
indexes = []
for i in range(0, len(chunk.latitudes)):
_lat = chunk.latitudes[i]
if isinstance(_lat, np.ma.core.MaskedConstant):
continue
for j in range(0, len(chunk.longitudes)):
_lon = chunk.longitudes[j]
if isinstance(_lon, np.ma.core.MaskedConstant):
continue
value = self.__getChunkValueAtIndex(chunk, (i, j))
if isinstance(value, float) and (math.isnan(value) or value == np.nan):
continue
u = utm.from_latlon(_lat, _lon)
v = (u[0], u[1], 0.0)
latlons.append((_lat, _lon))
utms.append(v)
indexes.append((i, j))
tree = None
if len(latlons) > 0:
tree = spatial.KDTree(utms)
chunk.swathIndexing = {
"tree": tree,
"latlons": latlons,
"indexes": indexes
}
def __getChunkIndexesForLatLon(self, chunk, lat, lon, xyTolerance):
foundIndexes = []
foundLatLons = []
if "swathIndexing" not in chunk.__dict__:
self.__buildSwathIndexes(chunk)
tree = chunk.swathIndexing["tree"]
if tree is not None:
indexes = chunk.swathIndexing["indexes"]
latlons = chunk.swathIndexing["latlons"]
u = utm.from_latlon(lat, lon)
coords = np.array([u[0], u[1], 0])
ball = tree.query_ball_point(coords, xyTolerance)
for i in ball:
foundIndexes.append(indexes[i])
foundLatLons.append(latlons[i])
return foundIndexes, foundLatLons
def __getChunkValueAtIndex(self, chunk, index, arrayName=None):
if arrayName is None or arrayName == "data":
data_val = chunk.data[0][index[0]][index[1]]
else:
data_val = chunk.meta_data[arrayName][0][index[0]][index[1]]
return data_val.item() if (data_val is not np.ma.masked) and data_val.size == 1 else float('Nan')
def __getSatNodeForLatLonAndTime(self, chunksByDay, source, lat, lon, searchTime, xyTolerance):
timeDiff = 86400 * 365 * 1000
foundNodes = []
for ts in chunksByDay:
chunks = chunksByDay[ts]
if abs((ts * 1000) - searchTime) < timeDiff:
for chunk in chunks:
indexes, latlons = self.__getChunkIndexesForLatLon(chunk, lat, lon, xyTolerance)
# for index in indexes:
for i in range(0, len(indexes)):
index = indexes[i]
latlon = latlons[i]
sst = None
sss = None
windSpeed = None
windDirection = None
windU = None
windV = None
value = self.__getChunkValueAtIndex(chunk, index)
if isinstance(value, float) and (math.isnan(value) or value == np.nan):
continue
if "GHRSST" in source:
sst = value
elif "ASCATB" in source:
windU = value
elif "SSS" in source: # SMAP
sss = value
if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data:
windDirection = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_dir"))
if len(chunks) > 0 and "wind_v" in chunks[0].meta_data:
windV = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_v"))
if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data:
windSpeed = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_speed"))
foundNode = {
"sea_water_temperature": sst,
"sea_water_salinity": sss,
"wind_speed": windSpeed,
"wind_direction": windDirection,
"wind_u": windU,
"wind_v": windV,
"time": ts,
"x": self.__checkNumber(latlon[1]),
"y": self.__checkNumber(latlon[0]),
"depth": 0,
"sea_water_temperature_depth": 0,
"source": source,
"id": "%s:%s:%s" % (ts, lat, lon)
}
foundNodes.append(foundNode)
timeDiff = abs(ts - searchTime)
return foundNodes
def __getSatNodeForLatLonAndTime__(self, chunksByDay, source, lat, lon, searchTime):
timeDiff = 86400 * 365 * 1000
foundNodes = []
for ts in chunksByDay:
chunks = chunksByDay[ts]
# print chunks
# ts = calendar.timegm(chunks.start.utctimetuple()) * 1000
if abs((ts * 1000) - searchTime) < timeDiff:
value = self.__getValueForLatLon(chunks, lat, lon, arrayName="data")
value = self.__checkNumber(value)
# _Really_ don't like doing it this way...
sst = None
sss = None
windSpeed = None
windDirection = None
windU = None
windV = None
if "GHRSST" in source:
sst = value
if "ASCATB" in source:
windU = value
if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data:
windDirection = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_dir"))
if len(chunks) > 0 and "wind_v" in chunks[0].meta_data:
windV = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_v"))
if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data:
windSpeed = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_speed"))
foundNode = {
"sea_water_temperature": sst,
"sea_water_salinity": sss,
"wind_speed": windSpeed,
"wind_direction": windDirection,
"wind_uv": {
"u": windU,
"v": windV
},
"time": ts,
"x": lon,
"y": lat,
"depth": 0,
"sea_water_temperature_depth": 0,
"source": source,
"id": "%s:%s:%s" % (ts, lat, lon)
}
isValidNode = True
if "ASCATB" in source and windSpeed is None:
isValidNode = None
if isValidNode:
foundNodes.append(foundNode)
timeDiff = abs(ts - searchTime)
return foundNodes
class InsituDatasetProcessor:
def __init__(self, primary, datasource, startTime, endTime, bbox, depth_min, depth_max, platforms, timeTolerance,
radiusTolerance):
self.primary = primary
self.datasource = datasource
self.startTime = startTime
self.endTime = endTime
self.bbox = bbox
self.depth_min = depth_min
self.depth_max = depth_max
self.platforms = platforms
self.timeTolerance = timeTolerance
self.radiusTolerance = radiusTolerance
def start(self):
def callback(pageData):
self.primary.processInSitu(pageData, self.radiusTolerance, self.timeTolerance)
fetchedgeimpl.fetch(self.datasource, self.startTime, self.endTime, self.bbox, self.depth_min, self.depth_max,
self.platforms, pageCallback=callback)
class InsituPageProcessor:
def __init__(self):
pass