blob: f21585b6e5884e8b02e974194a409b0ccc430d9b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from datetime import datetime
import numpy as np
import pytz
from nexustiles.nexustiles import NexusTileService
from shapely import wkt
from shapely.geometry import Polygon
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException
SENTINEL = 'STOP'
EPOCH = pytz.timezone('UTC').localize(datetime(1970, 1, 1))
def iso_time_to_epoch(str_time):
return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=pytz.UTC) - EPOCH).total_seconds()
@nexus_handler
class DailyDifferenceAverageNexusImplSpark(NexusCalcSparkHandler):
name = "Daily Difference Average Spark"
path = "/dailydifferenceaverage_spark"
description = "Subtracts data in box in Dataset 1 from Dataset 2, then averages the difference per day."
params = {
"dataset": {
"name": "Dataset",
"type": "string",
"description": "The Dataset shortname to use in calculation"
},
"climatology": {
"name": "Climatology",
"type": "string",
"description": "The Climatology shortname to use in calculation"
},
"b": {
"name": "Bounding box",
"type": "comma-delimited float",
"description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude"
},
"startTime": {
"name": "Start Time",
"type": "string",
"description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ"
},
"endTime": {
"name": "End Time",
"type": "string",
"description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ"
}
}
singleton = True
def __init__(self, **kwargs):
NexusCalcSparkHandler.__init__(self, **kwargs)
self.log = logging.getLogger(__name__)
def parse_arguments(self, request):
# Parse input arguments
self.log.debug("Parsing arguments")
try:
bounding_polygon = request.get_bounding_polygon()
except:
try:
minLat = request.get_min_lat()
maxLat = request.get_max_lat()
minLon = request.get_min_lon()
maxLon = request.get_max_lon()
bounding_polygon = Polygon([(minLon, minLat), # (west, south)
(maxLon, minLat), # (east, south)
(maxLon, maxLat), # (east, north)
(minLon, maxLat), # (west, north)
(minLon, minLat)]) # (west, south)
except:
raise NexusProcessingException(
reason="'b' argument or 'minLon', 'minLat', 'maxLon', and 'maxLat' arguments are required. If 'b' is used, it must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
code=400)
dataset = request.get_argument('dataset', None)
if dataset is None:
dataset = request.get_argument('ds1', None)
if dataset is None:
raise NexusProcessingException(reason="'dataset' or 'ds1' argument is required", code=400)
climatology = request.get_argument('climatology', None)
if climatology is None:
climatology = request.get_argument('ds2', None)
if climatology is None:
raise NexusProcessingException(reason="'climatology' or 'ds2' argument is required", code=400)
try:
start_time = request.get_start_datetime()
except:
raise NexusProcessingException(
reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
code=400)
try:
end_time = request.get_end_datetime()
except:
raise NexusProcessingException(
reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
code=400)
start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
plot = request.get_boolean_arg("plot", default=False)
return bounding_polygon, dataset, climatology, start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot
def calc(self, request, **args):
bounding_polygon, dataset, climatology, start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot = self.parse_arguments(
request)
self.log.debug("Querying for tiles in search domain")
# Get tile ids in box
tile_ids = [tile.tile_id for tile in
self._get_tile_service().find_tiles_in_polygon(bounding_polygon, dataset,
start_seconds_from_epoch, end_seconds_from_epoch,
fetch_data=False, fl='id',
sort=['tile_min_time_dt asc', 'tile_min_lon asc',
'tile_min_lat asc'], rows=5000)]
# Call spark_matchup
try:
spark_result = spark_anomalies_driver(self._tile_service_factory,
tile_ids,
wkt.dumps(bounding_polygon),
dataset,
climatology,
sc=self._sc)
except Exception as e:
self.log.exception(e)
raise NexusProcessingException(reason="An unknown error occurred while computing average differences",
code=500)
average_and_std_by_day = spark_result
min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds
result = DDAResult(
results=[[{'time': dayms, 'mean': avg_std[0], 'std': avg_std[1], 'ds': 0}] for dayms, avg_std in
average_and_std_by_day],
stats={}, meta=self.get_meta(dataset),
computeOptions=None, minLat=min_lat,
maxLat=max_lat, minLon=min_lon,
maxLon=max_lon, ds=dataset, startTime=start_seconds_from_epoch,
endTime=end_seconds_from_epoch)
result.meta()['climatology'] = climatology
return result
def get_meta(self, dataset):
# TODO yea this is broken
if 'sst' in dataset.lower() or 'mur' in dataset.lower():
meta = {
"title": "Sea Surface Temperature (SST) Anomalies",
"description": "SST anomalies are departures from the 5-day pixel mean",
"units": u'\u00B0C',
"label": u'Difference from 5-Day mean (\u00B0C)'
}
elif 'chl' in dataset.lower():
meta = {
"title": "Chlorophyll Concentration Anomalies",
"description": "Chlorophyll Concentration anomalies are departures from the 5-day pixel mean",
"units": u'mg m^-3',
"label": u'Difference from 5-Day mean (mg m^-3)'
}
elif 'sla' in dataset.lower():
meta = {
"title": "Sea Level Anomaly Estimate (SLA) Anomalies",
"description": "SLA anomalies are departures from the 5-day pixel mean",
"units": u'm',
"label": u'Difference from 5-Day mean (m)'
}
elif 'sss' in dataset.lower():
meta = {
"title": "Sea Surface Salinity (SSS) Anomalies",
"description": "SSS anomalies are departures from the 5-day pixel mean",
"units": u'psu',
"label": u'Difference from 5-Day mean (psu)'
}
elif 'ccmp' in dataset.lower():
meta = {
"title": "Wind Speed Anomalies",
"description": "Wind Speed anomalies are departures from the 1-day pixel mean",
"units": u'm/s',
"label": u'Difference from 1-Day mean (m/s)'
}
elif 'trmm' in dataset.lower():
meta = {
"title": "Precipitation Anomalies",
"description": "Precipitation anomalies are departures from the 5-day pixel mean",
"units": u'mm',
"label": u'Difference from 5-Day mean (mm)'
}
else:
meta = {
"title": "Anomalies",
"description": "Anomalies are departures from the 5-day pixel mean",
"units": u'',
"label": u'Difference from 5-Day mean'
}
return meta
class DDAResult(NexusResults):
def toImage(self):
from StringIO import StringIO
import matplotlib.pyplot as plt
from matplotlib.dates import date2num
times = [date2num(datetime.fromtimestamp(dayavglistdict[0]['time'], pytz.utc).date()) for dayavglistdict in
self.results()]
means = [dayavglistdict[0]['mean'] for dayavglistdict in self.results()]
plt.plot_date(times, means, '|g-')
plt.xlabel('Date')
plt.xticks(rotation=70)
plt.ylabel(u'Difference from 5-Day mean (\u00B0C)')
plt.title('Sea Surface Temperature (SST) Anomalies')
plt.grid(True)
plt.tight_layout()
sio = StringIO()
plt.savefig(sio, format='png')
return sio.getvalue()
from threading import Lock
from shapely.geometry import box
DRIVER_LOCK = Lock()
class NoClimatologyTile(Exception):
pass
class NoDatasetTile(Exception):
pass
def determine_parllelism(num_tiles):
"""
Try to stay at a maximum of 1500 tiles per partition; But don't go over 128 partitions.
Also, don't go below the default of 8
"""
num_partitions = max(min(num_tiles // 1500, 128), 8)
return num_partitions
def spark_anomalies_driver(tile_service_factory, tile_ids, bounding_wkt, dataset, climatology, sc=None):
from functools import partial
with DRIVER_LOCK:
bounding_wkt_b = sc.broadcast(bounding_wkt)
dataset_b = sc.broadcast(dataset)
climatology_b = sc.broadcast(climatology)
# Parallelize list of tile ids
rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
def add_tuple_elements(tuple1, tuple2):
cumulative_sum = tuple1[0] + tuple2[0]
cumulative_count = tuple1[1] + tuple2[1]
avg_1 = tuple1[0] / tuple1[1]
avg_2 = tuple2[0] / tuple2[1]
cumulative_var = parallel_variance(avg_1, tuple1[1], tuple1[2], avg_2, tuple2[1], tuple2[2])
return cumulative_sum, cumulative_count, cumulative_var
def parallel_variance(avg_a, count_a, var_a, avg_b, count_b, var_b):
# Thanks Wikipedia https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
delta = avg_b - avg_a
m_a = var_a * (count_a - 1)
m_b = var_b * (count_b - 1)
M2 = m_a + m_b + delta ** 2 * count_a * count_b / (count_a + count_b)
return M2 / (count_a + count_b - 1)
def compute_avg_and_std(sum_cnt_var_tuple):
return sum_cnt_var_tuple[0] / sum_cnt_var_tuple[1], np.sqrt(sum_cnt_var_tuple[2])
result = rdd \
.mapPartitions(partial(calculate_diff, tile_service_factory, bounding_wkt=bounding_wkt_b, dataset=dataset_b,
climatology=climatology_b)) \
.reduceByKey(add_tuple_elements) \
.mapValues(compute_avg_and_std) \
.sortByKey() \
.collect()
return result
def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climatology):
from itertools import chain
# Construct a list of generators that yield (day, sum, count, variance)
diff_generators = []
tile_ids = list(tile_ids)
if len(tile_ids) == 0:
return []
tile_service = tile_service_factory()
for tile_id in tile_ids:
# Get the dataset tile
try:
dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id)
except NoDatasetTile:
# This should only happen if all measurements in a tile become masked after applying the bounding polygon
continue
tile_day_of_year = dataset_tile.min_time.timetuple().tm_yday
# Get the climatology tile
try:
climatology_tile = get_climatology_tile(tile_service, wkt.loads(bounding_wkt.value),
box(dataset_tile.bbox.min_lon,
dataset_tile.bbox.min_lat,
dataset_tile.bbox.max_lon,
dataset_tile.bbox.max_lat),
climatology.value,
tile_day_of_year)
except NoClimatologyTile:
continue
diff_generators.append(generate_diff(dataset_tile, climatology_tile))
return chain(*diff_generators)
def get_dataset_tile(tile_service, search_bounding_shape, tile_id):
the_time = datetime.now()
try:
# Load the dataset tile
dataset_tile = tile_service.find_tile_by_id(tile_id)[0]
# Mask it to the search domain
dataset_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [dataset_tile])[0]
except IndexError:
raise NoDatasetTile()
print "%s Time to load dataset tile %s" % (str(datetime.now() - the_time), dataset_tile.tile_id)
return dataset_tile
def get_climatology_tile(tile_service, search_bounding_shape, tile_bounding_shape, climatology_dataset,
tile_day_of_year):
the_time = datetime.now()
try:
# Load the tile
climatology_tile = tile_service.find_tile_by_polygon_and_most_recent_day_of_year(
tile_bounding_shape,
climatology_dataset,
tile_day_of_year)[0]
except IndexError:
raise NoClimatologyTile()
if search_bounding_shape.contains(tile_bounding_shape):
# The tile is totally contained in the search area, we don't need to mask it.
pass
else:
# The tile is not totally contained in the search area,
# we need to mask the data to the search domain.
try:
# Mask it to the search domain
climatology_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [climatology_tile])[0]
except IndexError:
raise NoClimatologyTile()
print "%s Time to load climatology tile %s" % (str(datetime.now() - the_time), climatology_tile.tile_id)
return climatology_tile
def generate_diff(data_tile, climatology_tile):
the_time = datetime.now()
diff = np.subtract(data_tile.data, climatology_tile.data)
diff_sum = np.nansum(diff)
diff_var = np.nanvar(diff)
diff_ct = np.ma.count(diff)
date_in_seconds = int((datetime.combine(data_tile.min_time.date(), datetime.min.time()).replace(
tzinfo=pytz.UTC) - EPOCH).total_seconds())
print "%s Time to generate diff between %s and %s" % (
str(datetime.now() - the_time), data_tile.tile_id, climatology_tile.tile_id)
yield (date_in_seconds, (diff_sum, diff_ct, diff_var))