| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import csv |
| import datetime |
| import io |
| import json |
| import os |
| import re |
| import warnings |
| from datetime import datetime |
| from pathlib import Path |
| from tempfile import NamedTemporaryFile as Temp |
| from time import sleep |
| from urllib.parse import urljoin, urlparse, urlunparse |
| from zipfile import ZipFile |
| |
| import pandas as pd |
| import pytest |
| import requests |
| from bs4 import BeautifulSoup |
| from dateutil.parser import parse |
| from geopy.distance import geodesic |
| from pytz import timezone, UTC |
| from shapely import wkt |
| from shapely.geometry import Polygon, Point |
| |
| import cdms_reader |
| |
| |
| ######################### |
| # |
| # export TEST_HOST=http://localhost:8083/ |
| # unset TEST_HOST |
| # |
| ######################### |
| |
| # TODO: Consider removing old helper methods & fixtures for old CDMS tests that aren't used here (mainly insitu stuff) |
| |
| @pytest.fixture(scope="session") |
| def host(): |
| return os.getenv('TEST_HOST', 'http://localhost:8083/') |
| |
| |
| @pytest.fixture(scope="session") |
| def eid(): |
| return { |
| 'successful': False, |
| 'eid': [], |
| 'params': [] |
| } |
| |
| |
| start_time = None |
| |
| |
| @pytest.fixture(scope="session") |
| def start(): |
| global start_time |
| |
| if start_time is None: |
| start_time = datetime.now().strftime("%G%m%d%H%M%S%z") |
| |
| return start_time |
| |
| |
| @pytest.fixture() |
| def timeouts(): |
| connect_timeout = 9.05 # Recommended to be just above a multiple of 3 seconds |
| read_timeout = 303 # Just above current gateway timeout |
| timeouts = (connect_timeout, read_timeout) |
| |
| return timeouts |
| |
| |
| @pytest.fixture() |
| def fail_on_miscount(request): |
| return request.config.getoption('--matchup-warn-on-miscount', default=False) |
| |
| |
| @pytest.fixture(scope='session') |
| def distance_vs_time_query(host, start): |
| result = { |
| 'distances': { # Tuples: sec_lat, sec_lon, sec_time |
| 'min_dist': (), |
| 'min_time': () |
| }, |
| 'backup': { # Tuples: sec_lat, sec_lon, sec_time |
| 'min_dist': ("26.6141296", "-130.0827904", 1522637640), |
| 'min_time': ("26.6894016", "-130.0547072", 1522626840) |
| }, |
| 'success': False |
| } |
| |
| url = urljoin(host, 'match_spark') |
| |
| params = { |
| "primary": "JPL-L4-MRVA-CHLA-GLOB-v3.0", |
| "secondary": "shark-2018", |
| "startTime": "2018-04-01T00:00:00Z", |
| "endTime": "2018-04-01T23:59:59Z", |
| "b": "-131,26,-130,27", |
| "depthMin": -5, |
| "depthMax": 5, |
| "tt": 86400, |
| "rt": 10000, |
| "matchOnce": False, |
| "resultSizeLimit": 0, |
| "platforms": "3B", |
| "parameter": "mass_concentration_of_chlorophyll_in_sea_water", |
| } |
| |
| try: |
| body = run_matchup(url, params) |
| |
| data = body['data'] |
| |
| assert body['count'] == len(data) |
| check_count(len(data), 1, True) |
| |
| primary_point = data[0] |
| |
| def compute_distance(primary, secondary): |
| return geodesic((primary['lat'], primary['lon']), (secondary['lat'], secondary['lon'])).m |
| |
| def compute_time(primary, secondary): |
| return abs(primary['time'] - secondary['time']) |
| |
| distances = [ |
| (s['lat'], s['lon'], s['time'], compute_distance(primary_point, s), compute_time(primary_point, s)) |
| for s in primary_point['matches'] |
| ] |
| |
| try_save('computed_distances', start, distances) |
| |
| min_dist = min(distances, key=lambda x: x[3]) |
| min_time = min(distances, key=lambda x: x[4]) |
| |
| result['distances']['min_dist'] = min_dist[:3] |
| result['distances']['min_time'] = min_time[:3] |
| |
| result['success'] = True |
| except: |
| warnings.warn('Could not determine point distances for prioritization tests, using backup values instead') |
| |
| return result |
| |
| |
| @pytest.fixture() |
| def matchup_params(): |
| return { |
| 'gridded_to_gridded': { |
| "primary": "MUR25-JPL-L4-GLOB-v04.2_test", |
| "secondary": "SMAP_JPL_L3_SSS_CAP_8DAY-RUNNINGMEAN_V5_test", |
| "startTime": "2018-08-01T00:00:00Z", |
| "endTime": "2018-08-02T00:00:00Z", |
| "b": "-100,20,-90,30", |
| "depthMin": -20, |
| "depthMax": 10, |
| "tt": 43200, |
| "rt": 1000, |
| "matchOnce": True, |
| "resultSizeLimit": 7000, |
| "platforms": "42" |
| }, |
| 'gridded_to_swath': { |
| "primary": "MUR25-JPL-L4-GLOB-v04.2_test", |
| "secondary": "ASCATB-L2-Coastal_test", |
| "startTime": "2018-07-05T00:00:00Z", |
| "endTime": "2018-07-05T23:59:59Z", |
| "b": "-127,32,-120,40", |
| "depthMin": -20, |
| "depthMax": 10, |
| "tt": 12000, |
| "rt": 1000, |
| "matchOnce": True, |
| "resultSizeLimit": 7000, |
| "platforms": "42" |
| }, |
| 'swath_to_gridded': { |
| "primary": "ASCATB-L2-Coastal_test", |
| "secondary": "MUR25-JPL-L4-GLOB-v04.2_test", |
| "startTime": "2018-08-01T00:00:00Z", |
| "endTime": "2018-08-02T00:00:00Z", |
| "b": "-100,20,-90,30", |
| "depthMin": -20, |
| "depthMax": 10, |
| "tt": 43200, |
| "rt": 1000, |
| "matchOnce": True, |
| "resultSizeLimit": 7000, |
| "platforms": "65" |
| }, |
| 'swath_to_swath': { |
| "primary": "VIIRS_NPP-2018_Heatwave_test", |
| "secondary": "ASCATB-L2-Coastal_test", |
| "startTime": "2018-07-05T00:00:00Z", |
| "endTime": "2018-07-05T23:59:59Z", |
| "b": "-120,28,-118,30", |
| "depthMin": -20, |
| "depthMax": 10, |
| "tt": 43200, |
| "rt": 1000, |
| "matchOnce": True, |
| "resultSizeLimit": 7000, |
| "platforms": "42" |
| }, |
| 'long': { # TODO: Find something for this; it's copied atm |
| "primary": "VIIRS_NPP-2018_Heatwave_test", |
| "secondary": "ASCATB-L2-Coastal_test", |
| "startTime": "2018-07-05T00:00:00Z", |
| "endTime": "2018-07-05T23:59:59Z", |
| "b": "-120,28,-118,30", |
| "depthMin": -20, |
| "depthMax": 10, |
| "tt": 43200, |
| "rt": 1000, |
| "matchOnce": True, |
| "resultSizeLimit": 7000, |
| "platforms": "42" |
| }, |
| } |
| |
| |
| def skip(msg=""): |
| raise pytest.skip(msg) |
| |
| |
| def b_to_polygon(b): |
| west, south, east, north = [float(p) for p in b.split(",")] |
| polygon = Polygon([(west, south), (east, south), (east, north), (west, north), (west, south)]) |
| return polygon |
| |
| |
| def iso_time_to_epoch(str_time): |
| epoch = timezone('UTC').localize(datetime(1970, 1, 1)) |
| |
| return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace( |
| tzinfo=UTC) - epoch).total_seconds() |
| |
| |
| def verify_secondary_in_tolerance(primary, secondary, rt): |
| distance = geodesic((primary['lat'], primary['lon']), (secondary['lat'], secondary['lon'])).m |
| |
| assert distance <= rt |
| |
| |
| def translate_global_rows(rows): |
| translated = {} |
| |
| for row in rows: |
| parts = row.split(',', 1) |
| translated[parts[0]] = parts[1] |
| |
| return translated |
| |
| |
| def translate_matchup_rows(rows): |
| headers = rows[0].split(',') |
| |
| translated_rows = [] |
| |
| for row in rows[1:]: |
| translated_row = {} |
| |
| buf = io.StringIO(row) |
| reader = csv.reader(buf) |
| fields = list(reader)[0] |
| |
| assert len(headers) == len(fields) |
| |
| for i, field in enumerate(fields): |
| header = headers[i] |
| |
| if header not in translated_row: |
| translated_row[header] = field |
| else: |
| translated_row[f"{header}_secondary"] = field |
| |
| translated_rows.append(translated_row) |
| |
| return translated_rows |
| |
| |
| def lat_lon_to_point(lat, lon): |
| return wkt.loads(f"Point({lon} {lat})") |
| |
| |
| def format_time(timestamp): |
| t = parse(timestamp) |
| return t.strftime('%Y-%m-%dT%H:%M:%SZ') |
| |
| |
| def verify_match(match, point, time, s_point, s_time, params, bounding_poly): |
| # Check primary point is as expected |
| assert match['point'] == point |
| assert match['time'] == time |
| |
| # Check primary point within search bounds |
| assert iso_time_to_epoch(params['startTime']) \ |
| <= match['time'] \ |
| <= iso_time_to_epoch(params['endTime']) |
| assert bounding_poly.intersects(wkt.loads(match['point'])) |
| |
| secondary = match['matches'][0] |
| |
| # Check secondary point is as expected |
| assert secondary['point'] == s_point |
| assert secondary['time'] == s_time |
| |
| # Check secondary point within specified spatial & temporal tolerances for matched primary |
| verify_secondary_in_tolerance(match, secondary, params['rt']) |
| |
| assert (match['time'] - params['tt']) \ |
| <= secondary['time'] \ |
| <= (match['time'] + params['tt']) |
| |
| |
| def verify_match_consistency(match, params, bounding_poly): |
| # Check primary point within search bounds |
| assert iso_time_to_epoch(params['startTime']) \ |
| <= match['time'] \ |
| <= iso_time_to_epoch(params['endTime']) |
| assert bounding_poly.intersects(wkt.loads(match['point'])) |
| |
| for secondary in match['matches']: |
| # Check secondary point within specified spatial & temporal tolerances for matched primary |
| verify_secondary_in_tolerance(match, secondary, params['rt']) |
| |
| assert (match['time'] - params['tt']) \ |
| <= secondary['time'] \ |
| <= (match['time'] + params['tt']) |
| |
| |
| def validate_insitu(body, params, test): |
| if body['total'] <= params['itemsPerPage']: |
| assert body['total'] == len(body['results']) |
| else: |
| assert len(body['results']) == params['itemsPerPage'] |
| |
| if len(body['results']) == 0: |
| warnings.warn(f'Insitu test ({test}) returned no results!') |
| |
| bounding_poly = b_to_polygon(params['bbox']) |
| |
| for result in body['results']: |
| assert bounding_poly.intersects( |
| wkt.loads(f"Point({result['longitude']} {result['latitude']})") |
| ) |
| |
| if result['depth'] != -99999.0: |
| assert params['minDepth'] <= result['depth'] <= params['maxDepth'] |
| |
| assert params['startTime'] <= result['time'] <= params['endTime'] |
| |
| |
| def try_save(name, time, response, ext='json', mode='w'): |
| Path(f'responses/{time}/').mkdir(parents=True, exist_ok=True) |
| |
| try: |
| with open(f'responses/{time}/{name}.{ext}', mode=mode) as f: |
| if ext == 'json': |
| json.dump(response, f, indent=4) |
| elif ext == 'csv': |
| f.write(response.text) |
| else: |
| f.write(response.content) |
| except Exception as e: |
| warnings.warn(f"Failed to save response for {name}\n{e}", RuntimeWarning) |
| |
| |
| def uniq_primaries(primaries, xfail=False, case=None): |
| class Primary: |
| def __init__(self, p): |
| self.platform = p['platform'] |
| self.device = p['device'] |
| self.lon = p['lon'] |
| self.lat = p['lat'] |
| self.point = p['point'] |
| self.time = p['time'] |
| self.depth = p['depth'] |
| self.fileurl = p['fileurl'] |
| self.id = p['id'] |
| self.source = p['source'] |
| self.primary = p['primary'] |
| self.matches = p['matches'] |
| |
| def __eq__(self, other): |
| if not isinstance(other, Primary): |
| return False |
| |
| return self.platform == other.platform and \ |
| self.device == other.device and \ |
| self.lon == other.lon and \ |
| self.lat == other.lat and \ |
| self.point == other.point and \ |
| self.time == other.time and \ |
| self.depth == other.depth and \ |
| self.fileurl == other.fileurl and \ |
| self.id == other.id and \ |
| self.source == other.source and \ |
| self.primary == other.primary |
| |
| def __str__(self): |
| primary = { |
| "platform": self.platform, |
| "device": self.device, |
| "lon": self.lon, |
| "lat": self.lat, |
| "point": self.point, |
| "time": self.time, |
| "depth": self.depth, |
| "fileurl": self.fileurl, |
| "id": self.id, |
| "source": self.source, |
| "primary": self.primary, |
| } |
| |
| return json.dumps(primary, indent=4) |
| |
| points = [Primary(p) for p in primaries] |
| |
| checked = [] |
| duplicates = {} |
| |
| for p in points: |
| for c in checked: |
| if p == c: |
| if p.id not in duplicates: |
| duplicates[p.id] = [p, c] |
| else: |
| duplicates[p.id].append(p) |
| break |
| checked.append(p) |
| |
| if len(duplicates) > 0: |
| m = print if not xfail else warnings.warn |
| |
| msg = f'Duplicate point(s) found ({len(duplicates)} total)' |
| |
| if case is not None: |
| msg += f' for case {case}' |
| |
| msg += '\n\n-----\n\n' |
| |
| for d in duplicates: |
| d = duplicates[d] |
| |
| msg += 'Primary point:\n' + str(d[0]) + '\n\n' |
| |
| matches = [p.matches for p in d] |
| |
| msg += f'Matches to ({len(matches)}):\n' |
| msg += json.dumps(matches, indent=4) |
| msg += '\n\n' |
| |
| m(msg) |
| |
| if xfail: |
| pytest.xfail('Duplicate points found') |
| else: |
| assert False, 'Duplicate points found' |
| |
| |
| def check_count(count, expected, fail_on_mismatch): |
| if count == expected: |
| return |
| elif fail_on_mismatch: |
| raise AssertionError(f'Incorrect count: Expected {expected}, got {count}') |
| else: |
| warnings.warn(f'Incorrect count: Expected {expected}, got {count}') |
| |
| |
| def url_scheme(scheme, url): |
| if urlparse(url).scheme == scheme: |
| return url |
| else: |
| return urlunparse(tuple([scheme] + list(urlparse(url)[1:]))) |
| |
| |
| # Run the matchup query and return json output (and eid?) |
| # Should be able to work if match_spark is synchronous or asynchronous |
| def run_matchup(url, params, page_size=3500): |
| TIMEOUT = 60 * 60 |
| # TIMEOUT = float('inf') |
| |
| response = requests.get(url, params=params) |
| |
| scheme = urlparse(url).scheme |
| |
| assert response.status_code == 200, 'Initial match_spark query failed' |
| response_json = response.json() |
| |
| asynchronous = 'status' in response_json |
| |
| if not asynchronous: |
| return response_json |
| else: |
| start = datetime.utcnow() |
| job_url = [link for link in response_json['links'] if link['rel'] == 'self'][0]['href'] |
| |
| job_url = url_scheme(scheme, job_url) |
| |
| retries = 3 |
| timeouts = [2, 5, 10] |
| |
| while response_json['status'] == 'running' and (datetime.utcnow() - start).total_seconds() <= TIMEOUT: |
| status_response = requests.get(job_url) |
| status_code = response.status_code |
| |
| # /job poll may fail internally. This does not necessarily indicate job failure (ie, Cassandra read |
| # timed out). Retry it a couple of times and fail the test if it persists. |
| if status_code == 500 and retries > 0: |
| warnings.warn('/job poll failed; retrying') |
| sleep(timeouts[3 - retries]) |
| retries -= 1 |
| continue |
| |
| assert status_response.status_code == 200, '/job status polling failed' |
| response_json = status_response.json() |
| |
| if response_json['status'] == 'running': |
| sleep(10) |
| |
| job_status = response_json['status'] |
| |
| if job_status == 'running': |
| skip(f'Job has been running too long ({(datetime.utcnow() - start)}), skipping to run other tests') |
| elif job_status in ['cancelled', 'failed']: |
| raise ValueError(f'Async matchup job finished with incomplete status ({job_status})') |
| else: |
| stac_url = [ |
| link for link in response_json['links'] if 'STAC' in link['title'] |
| ][0]['href'] |
| |
| stac_url = url_scheme(scheme, stac_url) |
| |
| catalogue_response = requests.get(stac_url) |
| assert catalogue_response.status_code == 200, 'Catalogue fetch failed' |
| |
| catalogue_response = catalogue_response.json() |
| |
| json_cat_url = [ |
| link for link in catalogue_response['links'] if 'JSON' in link['title'] |
| ][0]['href'] |
| |
| json_cat_url = url_scheme(scheme, json_cat_url) |
| |
| catalogue_response = requests.get(json_cat_url) |
| assert catalogue_response.status_code == 200, 'Catalogue fetch failed' |
| |
| catalogue_response = catalogue_response.json() |
| |
| results_urls = [ |
| url_scheme(scheme, link['href']) for link in |
| catalogue_response['links'] if 'output=JSON' in link['href'] |
| # link['href'] for link in response_json['links'] if link['type'] == 'application/json' |
| ] |
| |
| def get_results(url): |
| retries = 3 |
| retry_delay = 1.5 |
| |
| while retries > 0: |
| response = requests.get(url) |
| |
| try: |
| response.raise_for_status() |
| result = response.json() |
| |
| assert result['count'] == len(result['data']) |
| |
| return result |
| except: |
| retries -= 1 |
| sleep(retry_delay) |
| retry_delay *= 2 |
| |
| assert len(results_urls) > 0, 'STAC catalogue returned no result queries' |
| |
| matchup_result = get_results(results_urls[0]) |
| |
| for url in results_urls[1:]: |
| matchup_result['data'].extend(get_results(url)['data']) |
| |
| return matchup_result |
| |
| |
| @pytest.mark.integration |
| def test_version(host, start): |
| url = urljoin(host, 'version') |
| |
| response = requests.get(url) |
| |
| assert response.status_code == 200 |
| assert re.match(r'^\d+\.\d+\.\d+(-.+)?$', response.text) |
| |
| |
| @pytest.mark.integration |
| def test_capabilities(host, start): |
| url = urljoin(host, 'capabilities') |
| |
| response = requests.get(url) |
| |
| assert response.status_code == 200 |
| |
| capabilities = response.json() |
| |
| try_save('test_capabilities', start, capabilities) |
| |
| assert len(capabilities) > 0 |
| |
| for capability in capabilities: |
| assert all([k in capability for k in ['name', 'path', 'description', 'parameters']]) |
| assert all([isinstance(k, str) for k in ['name', 'path', 'description']]) |
| |
| assert isinstance(capability['parameters'], (dict, list)) |
| |
| for param in capability['parameters']: |
| if isinstance(capability['parameters'], dict): |
| param = capability['parameters'][param] |
| |
| assert isinstance(param, dict) |
| assert all([k in param and isinstance(param[k], str) for k in ['name', 'type', 'description']]) |
| |
| |
| @pytest.mark.integration |
| def test_endpoints(host, start): |
| url = urljoin(host, 'capabilities') |
| |
| response = requests.get(url) |
| |
| if response.status_code != 200: |
| skip('Could not get endpoints list. Expected if test_capabilities has failed') |
| |
| capabilities = response.json() |
| |
| endpoints = [c['path'] for c in capabilities] |
| |
| non_existent_endpoints = [] |
| |
| for endpoint in endpoints: |
| status = requests.head(urljoin(host, endpoint)).status_code |
| |
| if status == 404: |
| # Strip special characters because some endpoints have wildcards/regex characters |
| # This may not work forever though |
| stripped_endpoint = re.sub(r'[^a-zA-Z0-9/_-]', '', endpoint) |
| |
| status = requests.head(urljoin(host, stripped_endpoint)).status_code |
| |
| if status == 404: |
| non_existent_endpoints.append(([endpoint, stripped_endpoint], status)) |
| |
| assert len(non_existent_endpoints) == 0, non_existent_endpoints |
| |
| |
| @pytest.mark.integration |
| def test_heartbeat(host, start): |
| url = urljoin(host, 'heartbeat') |
| |
| response = requests.get(url) |
| |
| assert response.status_code == 200 |
| heartbeat = response.json() |
| |
| assert isinstance(heartbeat, dict) |
| assert all(heartbeat.values()) |
| |
| |
| @pytest.mark.integration |
| def test_swaggerui_sdap(host): |
| url = urljoin(host, 'apidocs/') |
| |
| response = requests.get(url) |
| |
| assert response.status_code == 200 |
| assert 'swagger-ui' in response.text |
| |
| try: |
| # There's probably a better way to do this, but extract the .yml file for the docs from the returned text |
| soup = BeautifulSoup(response.text, 'html.parser') |
| |
| script = str([tag for tag in soup.find_all('script') if tag.attrs == {}][0]) |
| |
| start_index = script.find('url:') |
| end_index = script.find('",\n', start_index) |
| |
| script = script[start_index:end_index] |
| |
| yml_filename = script.split('"')[1] |
| |
| url = urljoin(url, yml_filename) |
| |
| response = requests.get(url) |
| |
| assert response.status_code == 200 |
| except AssertionError: |
| raise |
| except: |
| try: |
| url = urljoin(url, 'openapi.yml') |
| |
| response = requests.get(url) |
| |
| assert response.status_code == 200 |
| |
| warnings.warn("Could not extract documentation yaml filename from response text, " |
| "but using an assumed value worked successfully") |
| except: |
| raise ValueError("Could not verify documentation yaml file, assumed value also failed") |
| |
| |
| @pytest.mark.integration |
| def test_list(host, start): |
| url = urljoin(host, 'list') |
| |
| response = requests.get(url) |
| |
| assert response.status_code == 200 |
| |
| body = response.json() |
| try_save("test_list", start, body) |
| |
| assert isinstance(body, list) |
| |
| if len(body) == 0: |
| warnings.warn('/list returned no datasets. This could be correct if SDAP has no data ingested, otherwise ' |
| 'this should be considered a failure') |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize( |
| ['collection'], |
| [('MUR25-JPL-L4-GLOB-v04.2_test',), ('OISSS_L4_multimission_7day_v1_test',)] |
| ) |
| def test_subset_L4(host, start, collection): |
| url = urljoin(host, 'datainbounds') |
| |
| params = { |
| "ds": collection, |
| "startTime": "2018-09-24T00:00:00Z", |
| "endTime": "2018-09-30T00:00:00Z", |
| "b": "160,-30,180,-25", |
| } |
| |
| response = requests.get(url, params=params) |
| assert response.status_code == 200 |
| |
| data = response.json() |
| try_save(f"test_datainbounds_L4_{collection}", start, data) |
| |
| bounding_poly = b_to_polygon(params['b']) |
| |
| epoch = datetime(1970, 1, 1, tzinfo=UTC) |
| |
| start = (datetime.strptime(params['startTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds() |
| end = (datetime.strptime(params['endTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds() |
| |
| for p in data: |
| assert bounding_poly.intersects(Point(float(p['longitude']), float(p['latitude']))) |
| assert start <= p['time'] <= end |
| |
| @pytest.mark.integration |
| def test_subset_L2(host, start): |
| url = urljoin(host, 'datainbounds') |
| |
| params = { |
| "ds": "ASCATB-L2-Coastal_test", |
| "startTime": "2018-09-24T00:00:00Z", |
| "endTime": "2018-09-30T00:00:00Z", |
| "b": "160,-30,180,-25", |
| } |
| |
| response = requests.get(url, params=params) |
| assert response.status_code == 200 |
| |
| data = response.json() |
| try_save("test_datainbounds_L2", start, data) |
| |
| bounding_poly = b_to_polygon(params['b']) |
| |
| epoch = datetime(1970, 1, 1, tzinfo=UTC) |
| |
| start = (datetime.strptime(params['startTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds() |
| end = (datetime.strptime(params['endTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds() |
| |
| for p in data: |
| assert bounding_poly.intersects(Point(float(p['longitude']), float(p['latitude']))) |
| assert start <= p['time'] <= end |
| |
| @pytest.mark.integration |
| def test_timeseries_spark(host, start): |
| url = urljoin(host, 'timeSeriesSpark') |
| |
| params = { |
| "ds": "MUR25-JPL-L4-GLOB-v04.2_test", |
| "b": "-135,-10,-80,10", |
| "startTime": "2018-07-05T00:00:00Z", |
| "endTime": "2018-09-30T23:59:59Z", |
| } |
| |
| response = requests.get(url, params=params) |
| |
| assert response.status_code == 200 |
| |
| data = response.json() |
| try_save('test_timeseries_spark', start, data) |
| |
| assert len(data['data']) == len(pd.date_range(params['startTime'], params['endTime'], freq='D')) |
| |
| epoch = datetime(1970, 1, 1, tzinfo=UTC) |
| |
| start = (datetime.strptime(params['startTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds() |
| end = (datetime.strptime(params['endTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds() |
| |
| for p in data['data']: |
| assert start <= p[0]['time'] <= end |
| |
| |
| @pytest.mark.integration |
| def test_cdmslist(host, start): |
| url = urljoin(host, 'cdmslist') |
| |
| response = requests.get(url) |
| |
| assert response.status_code == 200 |
| |
| body = response.json() |
| try_save("test_cdmslist", start, body) |
| |
| data = body['data'] |
| |
| num_satellite = len(data['satellite']) |
| num_insitu = len(data['insitu']) |
| |
| if num_satellite == 0: |
| warnings.warn('/cdmslist returned no satellite datasets. This could be correct if SDAP has no data ingested, ' |
| 'otherwise this should be considered a failure') |
| |
| if num_insitu == 0: |
| warnings.warn('/cdmslist returned no insitu datasets. This could be correct if SDAP has no insitu data ' |
| 'ingested, otherwise this should be considered a failure') |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize( |
| ['collection'], |
| [('MUR25-JPL-L4-GLOB-v04.2_test',), ('OISSS_L4_multimission_7day_v1_test',)] |
| ) |
| def test_cdmssubset_L4(host, start, collection): |
| url = urljoin(host, 'cdmssubset') |
| |
| params = { |
| "dataset": collection, |
| "parameter": "sst", |
| "startTime": "2018-09-24T00:00:00Z", |
| "endTime": "2018-09-30T00:00:00Z", |
| "b": "160,-30,180,-25", |
| "output": "ZIP" |
| } |
| |
| response = requests.get(url, params=params) |
| |
| assert response.status_code == 200 |
| |
| try_save(f"test_cdmssubset_L4_{collection}", start, response, "zip", 'wb') |
| |
| bounding_poly = b_to_polygon(params['b']) |
| |
| response_buf = io.BytesIO(response.content) |
| |
| with ZipFile(response_buf) as data: |
| namelist = data.namelist() |
| |
| assert namelist == [f'{collection}.csv'] |
| |
| csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8')) |
| csv_data = pd.read_csv(csv_buf) |
| |
| def validate_row_bounds(row): |
| assert bounding_poly.intersects(Point(float(row['longitude']), float(row['latitude']))) |
| assert params['startTime'] <= row['time'] <= params['endTime'] |
| |
| for i in range(0, len(csv_data)): |
| validate_row_bounds(csv_data.iloc[i]) |
| |
| |
| @pytest.mark.integration |
| def test_cdmssubset_L2(host, start): |
| url = urljoin(host, 'cdmssubset') |
| |
| params = { |
| "dataset": "ASCATB-L2-Coastal_test", |
| "startTime": "2018-09-24T00:00:00Z", |
| "endTime": "2018-09-30T00:00:00Z", |
| "b": "160,-30,180,-25", |
| "output": "ZIP" |
| } |
| |
| response = requests.get(url, params=params) |
| |
| assert response.status_code == 200 |
| |
| try_save("test_cdmssubset_L2", start, response, "zip", 'wb') |
| |
| bounding_poly = b_to_polygon(params['b']) |
| |
| response_buf = io.BytesIO(response.content) |
| |
| with ZipFile(response_buf) as data: |
| namelist = data.namelist() |
| |
| assert namelist == ['ASCATB-L2-Coastal_test.csv'] |
| |
| csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8')) |
| csv_data = pd.read_csv(csv_buf) |
| |
| def validate_row_bounds(row): |
| assert bounding_poly.intersects(Point(float(row['longitude']), float(row['latitude']))) |
| assert params['startTime'] <= row['time'] <= params['endTime'] |
| |
| for i in range(0, len(csv_data)): |
| validate_row_bounds(csv_data.iloc[i]) |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.parametrize( |
| ['match', 'expected'], |
| list(zip( |
| ['gridded_to_gridded', 'gridded_to_swath', 'swath_to_gridded', 'swath_to_swath'], |
| [1058, 6, 21, 4026] |
| )) |
| ) |
| def test_match_spark(host, start, fail_on_miscount, matchup_params, match, expected): |
| url = urljoin(host, 'match_spark') |
| |
| params = matchup_params[match] |
| |
| bounding_poly = b_to_polygon(params['b']) |
| |
| body = run_matchup(url, params) |
| try_save(f"test_matchup_spark_{match}", start, body) |
| data = body['data'] |
| |
| for match in data: |
| verify_match_consistency(match, params, bounding_poly) |
| |
| uniq_primaries(data, case=f"test_matchup_spark_{match}") |
| check_count(len(data), expected, fail_on_miscount) |
| |
| |
| @pytest.mark.integration |
| def test_match_spark_job_cancellation(host, start, matchup_params): |
| url = urljoin(host, 'match_spark') |
| |
| params = matchup_params['long'] |
| |
| response = requests.get(url, params=params) |
| |
| assert response.status_code == 200, 'Initial match_spark query failed' |
| response_json = response.json() |
| |
| asynchronous = 'status' in response_json |
| |
| if not asynchronous: |
| skip('Deployed SDAP version does not have asynchronous matchup') |
| else: |
| sleep(1) # Time to allow spark workers to start working |
| |
| if response_json['status'] != 'running': |
| skip('Job finished before it could be cancelled') |
| else: |
| cancel_url = [link for link in response_json['links'] if link['rel'] == 'cancel'][0]['href'] |
| |
| cancel_url = url_scheme( |
| urlparse(url).scheme, |
| cancel_url |
| ) |
| |
| cancel_response = requests.get(cancel_url) |
| assert cancel_response.status_code == 200, 'Cancellation query failed' |
| |
| cancel_json = cancel_response.json() |
| |
| assert cancel_json['status'] != 'running', 'Job did not cancel' |
| |
| if cancel_json['status'] in ['success', 'failed']: |
| warnings.warn(f'Job status after cancellation is not \'cancelled\' ({cancel_json["status"]}), passing ' |
| f'case because it is no longer \'running\', but actual cancellation could not be tested ' |
| f'here.') |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.skip('Test not re-implemented yet') |
| def test_cdmsresults_json(host, eid, start): |
| url = urljoin(host, 'cdmsresults') |
| |
| # Skip the test automatically if the matchup request was not successful |
| if not eid['successful']: |
| skip('Matchup request was unsuccessful so there are no results to get from domsresults') |
| |
| def fetch_result(execution_id, output): |
| return requests.get(url, params={"id": execution_id, "output": output}) |
| |
| eid_list = eid['eid'] |
| param_list = eid['params'] |
| |
| response = fetch_result(eid_list[0], "JSON") |
| |
| assert response.status_code == 200 |
| |
| body = response.json() |
| try_save("test_cdmsresults_json_A", start, body) |
| |
| data = body['data'] |
| assert len(data) == 5 |
| |
| for m in data: |
| m['point'] = f"Point({m['lon']} {m['lat']})" |
| for s in m['matches']: |
| s['point'] = f"Point({s['lon']} {s['lat']})" |
| |
| data.sort(key=lambda e: e['point']) |
| |
| params = param_list[0] |
| bounding_poly = b_to_polygon(params['b']) |
| |
| verify_match( |
| data[0], 'Point(-86.125 27.625)', |
| 1535360400, 'Point(-86.13 27.63)', |
| 1535374800, params, bounding_poly |
| ) |
| |
| verify_match( |
| data[1], 'Point(-88.875 27.875)', |
| 1534669200, 'Point(-88.88 27.88)', |
| 1534698000, params, bounding_poly |
| ) |
| |
| verify_match( |
| data[2], 'Point(-90.125 27.625)', |
| 1534496400, 'Point(-90.13 27.63)', |
| 1534491000, params, bounding_poly |
| ) |
| |
| verify_match( |
| data[3], 'Point(-90.125 28.125)', |
| 1534928400, 'Point(-90.13 28.12)', |
| 1534899600, params, bounding_poly |
| ) |
| |
| verify_match( |
| data[4], 'Point(-90.375 28.125)', |
| 1534842000, 'Point(-90.38 28.12)', |
| 1534813200, params, bounding_poly |
| ) |
| |
| response = fetch_result(eid_list[1], "JSON") |
| |
| assert response.status_code == 200 |
| |
| body = response.json() |
| try_save("test_cdmsresults_json_B", start, body) |
| |
| data = body['data'] |
| assert len(data) == 5 |
| |
| for m in data: |
| m['point'] = f"Point({m['lon']} {m['lat']})" |
| for s in m['matches']: |
| s['point'] = f"Point({s['lon']} {s['lat']})" |
| |
| data.sort(key=lambda e: e['point']) |
| |
| params = param_list[1] |
| bounding_poly = b_to_polygon(params['b']) |
| |
| verify_match( |
| data[0], 'Point(-86.125 27.625)', |
| 1535371200, 'Point(-86.13 27.63)', |
| 1535374800, params, bounding_poly |
| ) |
| |
| verify_match( |
| data[1], 'Point(-88.875 27.875)', |
| 1534680000, 'Point(-88.88 27.88)', |
| 1534698000, params, bounding_poly |
| ) |
| |
| verify_match( |
| data[2], 'Point(-90.125 27.625)', |
| 1534507200, 'Point(-90.13 27.63)', |
| 1534491000, params, bounding_poly |
| ) |
| |
| verify_match( |
| data[3], 'Point(-90.125 28.125)', |
| 1534939200, 'Point(-90.13 28.12)', |
| 1534899600, params, bounding_poly |
| ) |
| |
| verify_match( |
| data[4], 'Point(-90.375 28.125)', |
| 1534852800, 'Point(-90.38 28.12)', |
| 1534813200, params, bounding_poly |
| ) |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.skip('Test not re-implemented yet') |
| def test_cdmsresults_csv(host, eid, start): |
| url = urljoin(host, 'cdmsresults') |
| |
| # Skip the test automatically if the matchup request was not successful |
| if not eid['successful']: |
| skip('Matchup request was unsuccessful so there are no results to get from domsresults') |
| |
| def fetch_result(execution_id, output): |
| return requests.get(url, params={"id": execution_id, "output": output}) |
| |
| eid_list = eid['eid'] |
| param_list = eid['params'] |
| |
| response = fetch_result(eid_list[0], "CSV") |
| params = param_list[0] |
| bounding_poly = b_to_polygon(params['b']) |
| |
| assert response.status_code == 200 |
| |
| try_save("test_cdmsresults_csv_A", start, response, "csv") |
| |
| rows = response.text.split('\r\n') |
| index = rows.index('') |
| |
| global_rows = rows[:index] |
| matchup_rows = rows[index + 1:-1] # Drop trailing empty string from trailing newline |
| |
| global_rows = translate_global_rows(global_rows) |
| matchup_rows = translate_matchup_rows(matchup_rows) |
| |
| assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched']) |
| |
| for row in matchup_rows: |
| primary_point = lat_lon_to_point(row['lat'], row['lon']) |
| |
| assert bounding_poly.intersects(primary_point) |
| assert params['startTime'] <= format_time(row['time']) <= params['endTime'] |
| |
| verify_secondary_in_tolerance( |
| {'lat': row['lat'], 'lon': row['lon']}, |
| {'lat': row['lat_secondary'], 'lon': row['lon_secondary']}, |
| params['rt'] |
| ) |
| assert (iso_time_to_epoch(params['startTime']) - params['tt']) \ |
| <= iso_time_to_epoch(format_time(row['time_secondary'])) \ |
| <= (iso_time_to_epoch(params['endTime']) + params['tt']) |
| |
| response = fetch_result(eid_list[1], "CSV") |
| params = param_list[1] |
| bounding_poly = b_to_polygon(params['b']) |
| |
| assert response.status_code == 200 |
| |
| try_save("test_cdmsresults_csv_B", start, response, "csv") |
| |
| rows = response.text.split('\r\n') |
| index = rows.index('') |
| |
| global_rows = rows[:index] |
| matchup_rows = rows[index + 1:-1] # Drop trailing empty string from trailing newline |
| |
| global_rows = translate_global_rows(global_rows) |
| matchup_rows = translate_matchup_rows(matchup_rows) |
| |
| assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched']) |
| |
| for row in matchup_rows: |
| primary_point = lat_lon_to_point(row['lat'], row['lon']) |
| |
| assert bounding_poly.intersects(primary_point) |
| assert params['startTime'] <= format_time(row['time']) <= params['endTime'] |
| |
| verify_secondary_in_tolerance( |
| {'lat': row['lat'], 'lon': row['lon']}, |
| {'lat': row['lat_secondary'], 'lon': row['lon_secondary']}, |
| params['rt'] |
| ) |
| assert (iso_time_to_epoch(params['startTime']) - params['tt']) \ |
| <= iso_time_to_epoch(format_time(row['time_secondary'])) \ |
| <= (iso_time_to_epoch(params['endTime']) + params['tt']) |
| |
| |
| @pytest.mark.integration |
| @pytest.mark.skip('Test not re-implemented yet') |
| def test_cdmsresults_netcdf(host, eid, start): |
| warnings.filterwarnings('ignore') |
| |
| url = urljoin(host, 'cdmsresults') |
| |
| # Skip the test automatically if the matchup request was not successful |
| if not eid['successful']: |
| skip('Matchup request was unsuccessful so there are no results to get from domsresults') |
| |
| def fetch_result(execution_id, output): |
| return requests.get(url, params={"id": execution_id, "output": output}) |
| |
| eid_list = eid['eid'] |
| param_list = eid['params'] |
| |
| temp_file = Temp(mode='wb+', suffix='.csv.tmp', prefix='CDMSReader_') |
| |
| response = fetch_result(eid_list[0], "NETCDF") |
| params = param_list[0] |
| bounding_poly = b_to_polygon(params['b']) |
| |
| assert response.status_code == 200 |
| |
| try_save("test_cdmsresults_netcdf_A", start, response, "nc", 'wb') |
| |
| temp_file.write(response.content) |
| temp_file.flush() |
| temp_file.seek(0) |
| |
| matches = cdms_reader.assemble_matches(temp_file.name) |
| |
| cdms_reader.matches_to_csv(matches, temp_file.name) |
| |
| with open(temp_file.name) as f: |
| reader = csv.DictReader(f) |
| rows = list(reader) |
| |
| for row in rows: |
| primary_point = lat_lon_to_point(row['PrimaryData_lat'], row['PrimaryData_lon']) |
| |
| assert bounding_poly.intersects(primary_point) |
| assert iso_time_to_epoch(params['startTime']) \ |
| <= float(row['PrimaryData_time']) \ |
| <= iso_time_to_epoch(params['endTime']) |
| |
| verify_secondary_in_tolerance( |
| {'lat': row['PrimaryData_lat'], 'lon': row['PrimaryData_lon']}, |
| {'lat': row['SecondaryData_lat'], 'lon': row['SecondaryData_lon']}, |
| params['rt'] |
| ) |
| assert (iso_time_to_epoch(params['startTime']) - params['tt']) \ |
| <= float(row['SecondaryData_time']) \ |
| <= (iso_time_to_epoch(params['endTime']) + params['tt']) |
| |
| response = fetch_result(eid_list[1], "NETCDF") |
| params = param_list[1] |
| bounding_poly = b_to_polygon(params['b']) |
| |
| assert response.status_code == 200 |
| |
| try_save("test_cdmsresults_netcdf_B", start, response, "nc", 'wb') |
| |
| temp_file.write(response.content) |
| temp_file.flush() |
| temp_file.seek(0) |
| |
| matches = cdms_reader.assemble_matches(temp_file.name) |
| |
| cdms_reader.matches_to_csv(matches, temp_file.name) |
| |
| with open(temp_file.name) as f: |
| reader = csv.DictReader(f) |
| rows = list(reader) |
| |
| for row in rows: |
| primary_point = lat_lon_to_point(row['PrimaryData_lat'], row['PrimaryData_lon']) |
| |
| assert bounding_poly.intersects(primary_point) |
| assert iso_time_to_epoch(params['startTime']) \ |
| <= float(row['PrimaryData_time']) \ |
| <= iso_time_to_epoch(params['endTime']) |
| |
| verify_secondary_in_tolerance( |
| {'lat': row['PrimaryData_lat'], 'lon': row['PrimaryData_lon']}, |
| {'lat': row['SecondaryData_lat'], 'lon': row['SecondaryData_lon']}, |
| params['rt'] |
| ) |
| assert (iso_time_to_epoch(params['startTime']) - params['tt']) \ |
| <= float(row['SecondaryData_time']) \ |
| <= (iso_time_to_epoch(params['endTime']) + params['tt']) |
| |
| temp_file.close() |
| warnings.filterwarnings('default') |