| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| import json |
| import os |
| import pickle |
| from datetime import datetime, timezone |
| |
| import mock |
| import numpy as np |
| import pytest |
| import webservice.algorithms_spark.Matchup as matchup |
| from nexustiles.model.nexusmodel import Tile |
| from pyspark.sql import SparkSession |
| from shapely import wkt |
| from shapely.geometry import box |
| from webservice.algorithms_spark.Matchup import DomsPoint, Matchup |
| |
| |
| @pytest.fixture(scope='function') |
| def test_dir(): |
| test_dir = os.path.dirname(os.path.realpath(__file__)) |
| test_data_dir = os.path.join(test_dir, '..', 'data') |
| yield test_data_dir |
| |
| |
| def test_doms_point_is_pickleable(): |
| edge_point = { |
| 'id': 'argo-profiles-5903995(46, 0)', |
| 'time': '2012-10-15T14:24:04Z', |
| 'point': '-33.467 29.728', |
| 'sea_water_temperature': 24.5629997253, |
| 'sea_water_temperature_depth': 2.9796258642, |
| 'wind_speed': None, |
| 'sea_water_salinity': None, |
| 'sea_water_salinity_depth': None, |
| 'platform': 4, |
| 'device': 3, |
| 'fileurl': 'ftp://podaac-ftp.jpl.nasa.gov/allData/argo-profiles-5903995.nc' |
| } |
| point = DomsPoint.from_edge_point(edge_point) |
| assert pickle.dumps(point) is not None |
| |
| |
| def test_calc(): |
| """ |
| Assert that the expected functions are called during the matchup |
| calculation and that the results are formatted as expected. |
| """ |
| # Mock anything that connects external dependence (Solr, Cassandra, ...) |
| tile_service_factory = mock.MagicMock() |
| tile_service = mock.MagicMock() |
| tile_service_factory.return_value = tile_service |
| spark = SparkSession.builder.appName('nexus-analysis').getOrCreate() |
| spark_context = spark.sparkContext |
| request = mock.MagicMock() |
| request.get_argument.return_value = '1,2,3,4' |
| |
| # Patch in request arguments |
| start_time = datetime.strptime('2020-01-01T00:00:00', '%Y-%m-%dT%H:%M:%S').replace( |
| tzinfo=timezone.utc) |
| end_time = datetime.strptime('2020-02-01T00:00:00', '%Y-%m-%dT%H:%M:%S').replace( |
| tzinfo=timezone.utc) |
| polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))' |
| args = { |
| 'bounding_polygon': wkt.loads(polygon_wkt), |
| 'primary_ds_name': 'primary-ds-name', |
| 'matchup_ds_names': 'matchup-ds-name', |
| 'parameter_s': 'sst', |
| 'start_time': start_time, |
| 'start_seconds_from_epoch': start_time.timestamp(), |
| 'end_time': end_time, |
| 'end_seconds_from_epoch': end_time.timestamp(), |
| 'depth_min': 1.0, |
| 'depth_max': 2.0, |
| 'time_tolerance': 3.0, |
| 'radius_tolerance': 4.0, |
| 'platforms': '1,2,3,4,5,6,7,8,9', |
| 'match_once': True, |
| 'result_size_limit': 10 |
| } |
| |
| def generate_fake_tile(tile_id): |
| tile = Tile() |
| tile.tile_id = tile_id |
| return tile |
| |
| # Mock tiles |
| fake_tiles = [generate_fake_tile(idx) for idx in range(10)] |
| tile_service.find_tiles_in_polygon.return_value = fake_tiles |
| |
| # Mock result |
| # Format of 'spark_result': keys=domspoint,values=list of domspoint |
| |
| doms_point_args = { |
| 'longitude': -180, |
| 'latitude': -90, |
| 'time': '2020-01-15T00:00:00Z' |
| } |
| d1_sat = DomsPoint(**doms_point_args) |
| d2_sat = DomsPoint(**doms_point_args) |
| d1_ins = DomsPoint(**doms_point_args) |
| d2_ins = DomsPoint(**doms_point_args) |
| |
| d1_sat.satellite_var_name = 'sea_surface_temperature' |
| d2_sat.satellite_var_name = 'sea_surface_temperature' |
| d1_ins.satellite_var_name = 'sea_surface_temperature' |
| d2_ins.satellite_var_name = 'sea_surface_temperature' |
| |
| d1_sat.satellite_var_value = 10.0 |
| d2_sat.satellite_var_value = 20.0 |
| d1_ins.satellite_var_value = 30.0 |
| d2_ins.satellite_var_value = 40.0 |
| |
| fake_spark_result = { |
| d1_sat: [d1_ins, d2_ins], |
| d2_sat: [d1_ins, d2_ins], |
| } |
| |
| matchup_obj = Matchup(tile_service_factory=tile_service_factory, sc=spark_context) |
| matchup_obj.parse_arguments = lambda _: [item for item in args.values()] |
| |
| with mock.patch('webservice.algorithms_spark.Matchup.ResultsStorage') as mock_rs, \ |
| mock.patch( |
| 'webservice.algorithms_spark.Matchup.spark_matchup_driver') as mock_matchup_driver: |
| mock_rs.insertExecution.return_value = 1 |
| mock_matchup_driver.return_value = fake_spark_result |
| matchup_result = matchup_obj.calc(request) |
| |
| # Ensure the call to 'spark_matchup_driver' contains the expected params |
| assert len(mock_matchup_driver.call_args_list) == 1 |
| matchup_driver_args = mock_matchup_driver.call_args_list[0].args |
| matchup_driver_kwargs = mock_matchup_driver.call_args_list[0].kwargs |
| assert matchup_driver_args[0] == [tile.tile_id for tile in fake_tiles] |
| assert wkt.loads(matchup_driver_args[1]).equals(wkt.loads(polygon_wkt)) |
| assert matchup_driver_args[2] == args['primary_ds_name'] |
| assert matchup_driver_args[3] == args['matchup_ds_names'] |
| assert matchup_driver_args[4] == args['parameter_s'] |
| assert matchup_driver_args[5] == args['depth_min'] |
| assert matchup_driver_args[6] == args['depth_max'] |
| assert matchup_driver_args[7] == args['time_tolerance'] |
| assert matchup_driver_args[8] == args['radius_tolerance'] |
| assert matchup_driver_args[9] == args['platforms'] |
| assert matchup_driver_args[10] == args['match_once'] |
| assert matchup_driver_args[11] == tile_service_factory |
| assert matchup_driver_kwargs['sc'] == spark_context |
| |
| # Ensure the result of the matchup calculation is as expected |
| |
| json_matchup_result = json.loads(matchup_result.toJson()) |
| assert len(json_matchup_result['data']) == 2 |
| assert len(json_matchup_result['data'][0]['matches']) == 2 |
| assert len(json_matchup_result['data'][1]['matches']) == 2 |
| |
| for data in json_matchup_result['data']: |
| assert data['x'] == '-180' |
| assert data['y'] == '-90' |
| for matches in data['matches']: |
| assert matches['x'] == '-180' |
| assert matches['y'] == '-90' |
| |
| assert json_matchup_result['data'][0]['sea_surface_temperature'] == 10.0 |
| assert json_matchup_result['data'][1]['sea_surface_temperature'] == 20.0 |
| assert json_matchup_result['data'][0]['matches'][0]['sea_surface_temperature'] == 30.0 |
| assert json_matchup_result['data'][0]['matches'][1]['sea_surface_temperature'] == 40.0 |
| assert json_matchup_result['data'][1]['matches'][0]['sea_surface_temperature'] == 30.0 |
| assert json_matchup_result['data'][1]['matches'][1]['sea_surface_temperature'] == 40.0 |
| |
| assert json_matchup_result['details']['numInSituMatched'] == 4 |
| assert json_matchup_result['details']['numGriddedMatched'] == 2 |
| |
| |
| def test_match_satellite_to_insitu(test_dir): |
| """ |
| Test the test_match_satellite_to_insitu and ensure the matchup is |
| done as expected, where the tile points and in-situ points are all |
| known and the expected matchup points have been hand-calculated. |
| |
| This test case mocks out all external dependencies, so Solr, |
| Cassandra, HTTP insitu requests, etc are all mocked. |
| |
| The test points are as follows: |
| |
| X (0, 20) X (20, 20) |
| |
| O (5, 15) |
| |
| |
| |
| O (10, 10) |
| |
| |
| |
| |
| O (18, 3) |
| |
| X (0, 0) X (20, 0) |
| |
| The 'X' points are the primary satellite points and the 'O' points |
| are the secondary satellite or insitu points |
| |
| Visual inspection reveals that primary point (0, 20) should match |
| with secondary point (5, 15) and primary point (20, 0) should match |
| with (18, 3) |
| """ |
| tile = Tile() |
| tile.tile_id = 1 |
| tile.tile_min_lat = 0 |
| tile.tile_max_lat = 20 |
| tile.tile_min_lon = 0 |
| tile.tile_max_lon = 20 |
| tile.dataset = 'test-dataset' |
| tile.dataset_id = 123 |
| tile.granule = 'test-granule-name' |
| tile.min_time = '2020-07-28T00:00:00' |
| tile.max_time = '2020-07-28T00:00:00' |
| tile.section_spec = 'test-section-spec' |
| tile.var_name = 'sea_surface_temperature' |
| tile.latitudes = np.array([0, 20], dtype=np.float32) |
| tile.longitudes = np.array([0, 20], dtype=np.float32) |
| tile.times = [1627490285] |
| tile.data = np.array([[[11.0, 21.0], [31.0, 41.0]]]) |
| tile.get_indices = lambda: [[0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 1, 1]] |
| tile.meta_data = {'wind_type': []} |
| |
| tile_service_factory = mock.MagicMock() |
| tile_service = mock.MagicMock() |
| tile_service_factory.return_value = tile_service |
| tile_service.get_bounding_box.return_value = box(-90, -45, 90, 45) |
| tile_service.get_min_time.return_value = 1627490285 |
| tile_service.get_max_time.return_value = 1627490285 |
| tile_service.mask_tiles_to_polygon.return_value = [tile] |
| |
| tile_ids = [1] |
| polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))' |
| primary_ds_name = 'primary-ds-name' |
| matchup_ds_names = 'test' |
| parameter = 'sst' |
| depth_min = 0.0 |
| depth_max = 1.0 |
| time_tolerance = 3.0 |
| radius_tolerance = 1000000.0 |
| platforms = '1,2,3,4,5,6,7,8,9' |
| |
| class MockSparkParam: |
| def __init__(self, value): |
| self.value = value |
| |
| with mock.patch('webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName') as mock_edge_endpoints: |
| # Test the satellite->insitu branch |
| # By mocking the getEndpointsByName function we are forcing |
| # Matchup to think this dummy matchup dataset is an insitu |
| # dataset |
| mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'} |
| matchup.query_edge = lambda *args, **kwargs: json.load( |
| open(os.path.join(test_dir, 'edge_response.json'))) |
| |
| match_args = dict( |
| tile_ids=tile_ids, |
| primary_b=MockSparkParam(primary_ds_name), |
| matchup_b=MockSparkParam(matchup_ds_names), |
| parameter_b=MockSparkParam(parameter), |
| tt_b=MockSparkParam(time_tolerance), |
| rt_b=MockSparkParam(radius_tolerance), |
| platforms_b=MockSparkParam(platforms), |
| bounding_wkt_b=MockSparkParam(polygon_wkt), |
| depth_min_b=MockSparkParam(depth_min), |
| depth_max_b=MockSparkParam(depth_max), |
| tile_service_factory=tile_service_factory |
| ) |
| |
| generator = matchup.match_satellite_to_insitu(**match_args) |
| |
| def validate_matchup_result(matchup_result, insitu_matchup): |
| """ |
| The matchup results for satellite->insitu vs |
| satellite->satellite are almost exactly the same so they |
| can be validated using the same logic. They are the same |
| because they represent the same data, except one test is in |
| tile format (sat to sat) and one is in edge point format |
| (insitu). The only difference is the data field is different |
| for satellite data. |
| """ |
| # There should be two primary matchup points |
| assert len(matchup_result) == 2 |
| # Each primary point matched with 1 matchup point |
| assert len(matchup_result[0]) == 2 |
| assert len(matchup_result[1]) == 2 |
| # Check that the satellite point was matched to the expected insitu point |
| assert matchup_result[0][1].latitude == 3.0 |
| assert matchup_result[0][1].longitude == 18.0 |
| assert matchup_result[1][1].latitude == 15.0 |
| assert matchup_result[1][1].longitude == 5.0 |
| # Check that the insitu points have the expected values |
| if insitu_matchup: |
| assert matchup_result[0][1].sst == 30.0 |
| assert matchup_result[1][1].sst == 10.0 |
| else: |
| assert matchup_result[0][1].satellite_var_value == 30.0 |
| assert matchup_result[1][1].satellite_var_value == 10.0 |
| # Check that the satellite points have the expected values |
| assert matchup_result[0][0].satellite_var_value == 21.0 |
| assert matchup_result[1][0].satellite_var_value == 31.0 |
| |
| insitu_matchup_result = list(generator) |
| validate_matchup_result(insitu_matchup_result, insitu_matchup=True) |
| |
| # Test the satellite->satellite branch |
| # By mocking the getEndpointsByName function to return None we |
| # are forcing Matchup to think this dummy matchup dataset is |
| # satellite dataset |
| mock_edge_endpoints.return_value = None |
| |
| # Open the edge response json. We want to convert these points |
| # to tile points so we can test sat to sat matchup |
| edge_json = json.load(open(os.path.join(test_dir, 'edge_response.json'))) |
| points = [wkt.loads(result['point']) for result in edge_json['results']] |
| |
| matchup_tile = Tile() |
| matchup_tile.tile_id = 1 |
| matchup_tile.tile_min_lat = 3 |
| matchup_tile.tile_max_lat = 15 |
| matchup_tile.tile_min_lon = 5 |
| matchup_tile.tile_max_lon = 18 |
| matchup_tile.dataset = 'test-dataset' |
| matchup_tile.dataset_id = 123 |
| matchup_tile.granule = 'test-granule-name' |
| matchup_tile.min_time = '2020-07-28T00:00:00' |
| matchup_tile.max_time = '2020-07-28T00:00:00' |
| matchup_tile.section_spec = 'test-section-spec' |
| matchup_tile.var_name = 'sea_surface_temperature' |
| matchup_tile.latitudes = np.array([point.y for point in points], dtype=np.float32) |
| matchup_tile.longitudes = np.array([point.x for point in points], dtype=np.float32) |
| matchup_tile.times = [edge_json['results'][0]['time']] |
| matchup_tile.data = np.array([[[10.0, 0, 0], [0, 20.0, 0], [0, 0, 30.0]]]) |
| matchup_tile.get_indices = lambda: [[0, 0, 0], [0, 1, 1], [0, 2, 2]] |
| matchup_tile.meta_data = {'wind_type': []} |
| |
| tile_service.find_tiles_in_polygon.return_value = [matchup_tile] |
| |
| generator = matchup.match_satellite_to_insitu(**match_args) |
| |
| sat_matchup_result = list(generator) |
| validate_matchup_result(sat_matchup_result, insitu_matchup=False) |