blob: 79fde67784ad422037226398e57da2ab4b550d4b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import csv
import json
import logging
from datetime import datetime
import requests
from . import BaseDomsHandler
from webservice.NexusHandler import nexus_handler
from webservice.algorithms.doms import config as edge_endpoints
from webservice.webmodel import NexusProcessingException, NoDataException
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
@nexus_handler
class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
name = "DOMS In Situ Subsetter"
path = "/domsinsitusubset"
description = "Subset a DOMS in situ source given the search domain."
params = [
{
"name": "source",
"type": "comma-delimited string",
"description": "The in situ Dataset to be sub-setted",
"required": "true",
"sample": "spurs"
},
{
"name": "parameter",
"type": "string",
"description": "The parameter of interest. One of 'sst', 'sss', 'wind'",
"required": "false",
"default": "All",
"sample": "sss"
},
{
"name": "startTime",
"type": "string",
"description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
"required": "true",
"sample": "2013-10-21T00:00:00Z"
},
{
"name": "endTime",
"type": "string",
"description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
"required": "true",
"sample": "2013-10-31T23:59:59Z"
},
{
"name": "b",
"type": "comma-delimited float",
"description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
"Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
"required": "true",
"sample": "-30,15,-45,30"
},
{
"name": "depthMin",
"type": "float",
"description": "Minimum depth of measurements. Must be less than depthMax",
"required": "false",
"default": "No limit",
"sample": "0"
},
{
"name": "depthMax",
"type": "float",
"description": "Maximum depth of measurements. Must be greater than depthMin",
"required": "false",
"default": "No limit",
"sample": "5"
},
{
"name": "platforms",
"type": "comma-delimited integer",
"description": "Platforms to include for subset consideration",
"required": "false",
"default": "All",
"sample": "1,2,3,4,5,6,7,8,9"
},
{
"name": "output",
"type": "string",
"description": "Output type. Only 'CSV' or 'JSON' is currently supported",
"required": "false",
"default": "JSON",
"sample": "CSV"
}
]
singleton = True
def __init__(self):
BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
self.log = logging.getLogger(__name__)
def parse_arguments(self, request):
# Parse input arguments
self.log.debug("Parsing arguments")
source_name = request.get_argument('source', None)
if source_name is None or source_name.strip() == '':
raise NexusProcessingException(reason="'source' argument is required", code=400)
parameter_s = request.get_argument('parameter', None)
if parameter_s not in ['sst', 'sss', 'wind', None]:
raise NexusProcessingException(
reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
try:
start_time = request.get_start_datetime()
start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
except:
raise NexusProcessingException(
reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
code=400)
try:
end_time = request.get_end_datetime()
end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
except:
raise NexusProcessingException(
reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
code=400)
if start_time > end_time:
raise NexusProcessingException(
reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
code=400)
try:
bounding_polygon = request.get_bounding_polygon()
except:
raise NexusProcessingException(
reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
code=400)
depth_min = request.get_decimal_arg('depthMin', default=None)
depth_max = request.get_decimal_arg('depthMax', default=None)
if depth_min is not None and depth_max is not None and depth_min >= depth_max:
raise NexusProcessingException(
reason="Depth Min should be less than Depth Max", code=400)
platforms = request.get_argument('platforms', None)
if platforms is not None:
try:
p_validation = platforms.split(',')
p_validation = [int(p) for p in p_validation]
del p_validation
except:
raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
return source_name, parameter_s, start_time, end_time, bounding_polygon, depth_min, depth_max, platforms
def calc(self, request, **args):
source_name, parameter_s, start_time, end_time, bounding_polygon, \
depth_min, depth_max, platforms = self.parse_arguments(request)
with requests.session() as edge_session:
edge_results = query_edge(source_name, parameter_s, start_time, end_time,
','.join([str(bound) for bound in bounding_polygon.bounds]),
platforms, depth_min, depth_max, edge_session)['results']
if len(edge_results) == 0:
raise NoDataException
return InSituSubsetResult(results=edge_results)
class InSituSubsetResult(object):
def __init__(self, results):
self.results = results
def toJson(self):
return json.dumps(self.results, indent=4)
def toCSV(self):
fieldnames = sorted(next(iter(self.results)).keys())
csv_mem_file = io.StringIO()
try:
writer = csv.DictWriter(csv_mem_file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.results)
csv_out = csv_mem_file.getvalue()
finally:
csv_mem_file.close()
return csv_out
def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, session, itemsPerPage=1000,
startIndex=0, stats=True):
log = logging.getLogger('webservice.algorithms.doms.insitusubset.query_edge')
try:
startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
except TypeError:
# Assume we were passed a properly formatted string
pass
try:
endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
except TypeError:
# Assume we were passed a properly formatted string
pass
try:
platform = platform.split(',')
except AttributeError:
# Assume we were passed a list
pass
params = {"startTime": startTime,
"endTime": endTime,
"bbox": bbox,
"minDepth": depth_min,
"maxDepth": depth_max,
"itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
if variable:
params['variable'] = variable
if platform:
params['platform'] = platform
edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
edge_request.raise_for_status()
edge_response = json.loads(edge_request.text)
# Get all edge results
next_page_url = edge_response.get('next', None)
while next_page_url is not None:
log.debug("requesting %s" % next_page_url)
edge_page_request = session.get(next_page_url)
edge_page_request.raise_for_status()
edge_page_response = json.loads(edge_page_request.text)
edge_response['results'].extend(edge_page_response['results'])
next_page_url = edge_page_response.get('next', None)
return edge_response