| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| import sys |
| import traceback |
| from datetime import datetime, timedelta |
| from multiprocessing.dummy import Pool, Manager |
| |
| import numpy as np |
| import pytz |
| from nexustiles.nexustiles import NexusTileService, NexusTileServiceException |
| from shapely.geometry import box |
| |
| from webservice.NexusHandler import nexus_handler |
| from webservice.algorithms.NexusCalcHandler import NexusCalcHandler |
| from webservice.webmodel import NexusResults, NexusProcessingException |
| |
| SENTINEL = 'STOP' |
| |
| |
| @nexus_handler |
| class DailyDifferenceAverageImpl(NexusCalcHandler): |
| name = "Daily Difference Average" |
| path = "/dailydifferenceaverage" |
| description = "Subtracts data in box in Dataset 1 from Dataset 2, then averages the difference per day." |
| params = { |
| "ds1": { |
| "name": "Dataset 1", |
| "type": "string", |
| "description": "The first Dataset shortname to use in calculation" |
| }, |
| "ds2": { |
| "name": "Dataset 2", |
| "type": "string", |
| "description": "The second Dataset shortname to use in calculation" |
| }, |
| "minLon": { |
| "name": "Minimum Longitude", |
| "type": "float", |
| "description": "Minimum (Western) bounding box Longitude" |
| }, |
| "minLat": { |
| "name": "Minimum Latitude", |
| "type": "float", |
| "description": "Minimum (Southern) bounding box Latitude" |
| }, |
| "maxLon": { |
| "name": "Maximum Longitude", |
| "type": "float", |
| "description": "Maximum (Eastern) bounding box Longitude" |
| }, |
| "maxLat": { |
| "name": "Maximum Latitude", |
| "type": "float", |
| "description": "Maximum (Northern) bounding box Latitude" |
| }, |
| "startTime": { |
| "name": "Start Time", |
| "type": "long integer", |
| "description": "Starting time in milliseconds since midnight Jan. 1st, 1970 UTC" |
| }, |
| "endTime": { |
| "name": "End Time", |
| "type": "long integer", |
| "description": "Ending time in milliseconds since midnight Jan. 1st, 1970 UTC" |
| } |
| } |
| singleton = True |
| |
| def calc(self, request, **args): |
| min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon() |
| dataset1 = request.get_argument("ds1", None) |
| dataset2 = request.get_argument("ds2", None) |
| start_time = request.get_start_time() |
| end_time = request.get_end_time() |
| |
| simple = request.get_argument("simple", None) is not None |
| |
| averagebyday = self.get_daily_difference_average_for_box(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, |
| start_time, end_time) |
| |
| averagebyday = sorted(averagebyday, key=lambda dayavg: dayavg[0]) |
| |
| if simple: |
| |
| import matplotlib.pyplot as plt |
| from matplotlib.dates import date2num |
| |
| times = [date2num(self.date_from_ms(dayavg[0])) for dayavg in averagebyday] |
| means = [dayavg[1] for dayavg in averagebyday] |
| plt.plot_date(times, means, ls='solid') |
| |
| plt.xlabel('Date') |
| plt.xticks(rotation=70) |
| plt.ylabel('Difference from 5-Day mean (\u00B0C)') |
| plt.title('Sea Surface Temperature (SST) Anomalies') |
| plt.grid(True) |
| plt.tight_layout() |
| plt.savefig("test.png") |
| |
| return averagebyday, None, None |
| else: |
| |
| result = NexusResults( |
| results=[[{'time': dayms, 'mean': avg, 'ds': 0}] for dayms, avg in averagebyday], |
| stats={}, |
| meta=self.get_meta()) |
| |
| result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time) |
| result.meta()['label'] = 'Difference from 5-Day mean (\u00B0C)' |
| |
| return result |
| |
| def date_from_ms(self, dayms): |
| base_datetime = datetime(1970, 1, 1) |
| delta = timedelta(0, 0, 0, dayms) |
| return base_datetime + delta |
| |
| def get_meta(self): |
| meta = { |
| "title": "Sea Surface Temperature (SST) Anomalies", |
| "description": "SST anomolies are departures from the 5-day pixel mean", |
| "units": '\u00B0C', |
| } |
| return meta |
| |
| def get_daily_difference_average_for_box(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, |
| start_time, |
| end_time): |
| |
| daysinrange = self._get_tile_service().find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset1, |
| start_time, end_time) |
| |
| maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses")) |
| |
| if maxprocesses == 1: |
| calculator = DailyDifferenceAverageCalculator() |
| averagebyday = [] |
| for dayinseconds in daysinrange: |
| result = calculator.calc_average_diff_on_day(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, |
| dayinseconds) |
| averagebyday.append((result[0], result[1])) |
| else: |
| # Create a task to calc average difference for each day |
| manager = Manager() |
| work_queue = manager.Queue() |
| done_queue = manager.Queue() |
| for dayinseconds in daysinrange: |
| work_queue.put( |
| ('calc_average_diff_on_day', min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, dayinseconds)) |
| [work_queue.put(SENTINEL) for _ in range(0, maxprocesses)] |
| |
| # Start new processes to handle the work |
| pool = Pool(maxprocesses) |
| [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in range(0, maxprocesses)] |
| pool.close() |
| |
| # Collect the results as [(day (in ms), average difference for that day)] |
| averagebyday = [] |
| for i in range(0, len(daysinrange)): |
| result = done_queue.get() |
| if result[0] == 'error': |
| print(result[1], file=sys.stderr) |
| raise NexusProcessingException(reason="Error calculating average by day.") |
| rdata = result |
| averagebyday.append((rdata[0], rdata[1])) |
| |
| pool.terminate() |
| manager.shutdown() |
| |
| return averagebyday |
| |
| |
| class DailyDifferenceAverageCalculator(object): |
| def __init__(self): |
| self.__tile_service = NexusTileService() |
| |
| def calc_average_diff_on_day(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, timeinseconds): |
| |
| day_of_year = datetime.fromtimestamp(timeinseconds, pytz.utc).timetuple().tm_yday |
| |
| ds1_nexus_tiles = self.__tile_service.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, |
| dataset1, |
| timeinseconds) |
| |
| # Initialize list of differences |
| differences = [] |
| # For each ds1tile |
| for ds1_tile in ds1_nexus_tiles: |
| # Get tile for ds2 using bbox from ds1_tile and day ms |
| try: |
| ds2_tile = self.__tile_service.find_tile_by_polygon_and_most_recent_day_of_year( |
| box(ds1_tile.bbox.min_lon, ds1_tile.bbox.min_lat, ds1_tile.bbox.max_lon, ds1_tile.bbox.max_lat), |
| dataset2, day_of_year)[0] |
| # Subtract ds2 tile from ds1 tile |
| diff = np.subtract(ds1_tile.data, ds2_tile.data) |
| except NexusTileServiceException: |
| # This happens when there is data in ds1tile but all NaNs in ds2tile because the |
| # Solr query being used filters out results where stats_count = 0. |
| # Technically, this should never happen if ds2 is a climatology generated in part from ds1 |
| # and it is probably a data error |
| |
| # For now, just treat ds2 as an array of all masked data (which essentially discards the ds1 data) |
| ds2_tile = np.ma.masked_all(ds1_tile.data.shape) |
| diff = np.subtract(ds1_tile.data, ds2_tile) |
| |
| # Put results in list of differences |
| differences.append(np.ma.array(diff).ravel()) |
| |
| # Average List of differences |
| diffaverage = np.ma.mean(differences).item() |
| # Return Average by day |
| return int(timeinseconds), diffaverage |
| |
| |
| def pool_worker(work_queue, done_queue): |
| try: |
| calculator = DailyDifferenceAverageCalculator() |
| |
| for work in iter(work_queue.get, SENTINEL): |
| scifunction = work[0] |
| args = work[1:] |
| result = calculator.__getattribute__(scifunction)(*args) |
| done_queue.put(result) |
| except Exception as e: |
| e_str = traceback.format_exc(e) |
| done_queue.put(('error', e_str)) |