wip: automatic reading processor detector
diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
index 0311f49..6377de0 100644
--- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
+++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
@@ -56,6 +56,8 @@
granule_name = os.path.basename(self._resource)
try:
return xr.open_dataset(file_path, lock=False), granule_name
+ except FileNotFoundError:
+ raise GranuleLoadingError(f"The granule file {self._resource} does not exist.")
except Exception:
raise GranuleLoadingError(f"The granule {self._resource} is not a valid NetCDF file.")
diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py b/granule_ingester/granule_ingester/pipeline/Modules.py
index 2cf2245..689b3b1 100644
--- a/granule_ingester/granule_ingester/pipeline/Modules.py
+++ b/granule_ingester/granule_ingester/pipeline/Modules.py
@@ -1,7 +1,10 @@
-from granule_ingester.processors import *
-from granule_ingester.processors.reading_processors import *
-from granule_ingester.slicers import *
-from granule_ingester.granule_loaders import *
+from granule_ingester.granule_loaders import GranuleLoader
+from granule_ingester.processors import (EmptyTileFilter, GenerateTileId,
+ KelvinToCelsius,
+ TileSummarizingProcessor)
+from granule_ingester.processors.reading_processors import (
+ EccoReadingProcessor, GridReadingProcessor)
+from granule_ingester.slicers import SliceFileByStepSize
modules = {
"granule": GranuleLoader,
@@ -11,5 +14,5 @@
"GridReadingProcessor": GridReadingProcessor,
"tileSummary": TileSummarizingProcessor,
"emptyTileFilter": EmptyTileFilter,
- "kelvinToCelsius": KelvinToCelsius
+ "kelvinToCelsius": KelvinToCelsius,
}
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index dabca81..03da05f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -26,6 +26,7 @@
from aiomultiprocess.types import ProxyException
from granule_ingester.exceptions import PipelineBuildingError
from granule_ingester.granule_loaders import GranuleLoader
+from granule_ingester.processors import ReadingProcessorSelector
from granule_ingester.pipeline.Modules import \
modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -142,6 +143,8 @@
slicer_config = config['slicer']
slicer = cls._parse_module(slicer_config, module_mappings)
+ reading_processor_selector = ReadingProcessorSelector(**config['readingProcessorSelector'])
+
tile_processors = []
for processor_config in config['processors']:
module = cls._parse_module(processor_config, module_mappings)
diff --git a/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py
new file mode 100644
index 0000000..2138c98
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/ReadingProcessorSelector.py
@@ -0,0 +1,58 @@
+import xarray as xr
+from typing import List
+import re
+
+from granule_ingester.processors.reading_processors import (TileReadingProcessor,
+ GridReadingProcessor,
+ EccoReadingProcessor,
+ SwathReadingProcessor,
+ TimeSeriesReadingProcessor)
+
+
+GRID_PROCESSORS = [GridReadingProcessor, EccoReadingProcessor, SwathReadingProcessor, TimeSeriesReadingProcessor]
+
+
+class ReadingProcessorSelector:
+ def __init__(self, dataset: xr.Dataset, variable: str, *args, **kwargs):
+ self._dataset = dataset
+ self._variable = variable
+
+ def get_reading_processor(self):
+ ...
+
+ def detect_grid_type(self, lat: str, lon: str, time: str, processor_types: List[TileReadingProcessor]):
+ bids = []
+ for processor_type in processor_types:
+ bid = processor_type.bid(dataset=self._dataset,
+ variable=self._variable,
+ lat=lat,
+ lon=lon,
+ time=time)
+ bids.append((processor_type, bid))
+ highest_bidder = max(bids, key=lambda bidder: bidder[1])
+
+ return highest_bidder[0]
+
+ def detect_dimensions(self):
+ lat_regex = r'((.*\s+)?latitude(.*\s+)?)|((.*\s+)?lat(\s+.*)?)'
+ lon_regex = r'((.*\s+)?longitude(.*\s+)?)|((.*\s+)?lon(\s+.*)?)'
+ time_regex = r'(.*\s+)?time(.*\s+)?'
+
+ lat = self._detect_dimension_name(lat_regex)
+ lon = self._detect_dimension_name(lon_regex)
+ time = self._detect_dimension_name(time_regex)
+
+ return (lat, lon, time)
+
+ def _detect_dimension_name(self, pattern: str) -> str:
+ candidates = []
+ for dim_name in self._dataset.data_vars:
+ long_name = self._dataset[dim_name].long_name
+ if re.match(pattern, long_name):
+ candidates.append(dim_name)
+ if len(candidates) > 1:
+ raise Exception(f"Found multiple possibilities for dimension with pattern {pattern}.")
+
+ if len(candidates) == 0:
+ return None
+ return candidates[0]
diff --git a/granule_ingester/granule_ingester/processors/__init__.py b/granule_ingester/granule_ingester/processors/__init__.py
index 592d8ea..a05673a 100644
--- a/granule_ingester/granule_ingester/processors/__init__.py
+++ b/granule_ingester/granule_ingester/processors/__init__.py
@@ -1,5 +1,6 @@
+from granule_ingester.processors.ReadingProcessorSelector import ReadingProcessorSelector
from granule_ingester.processors.EmptyTileFilter import EmptyTileFilter
from granule_ingester.processors.GenerateTileId import GenerateTileId
+from granule_ingester.processors.kelvintocelsius import KelvinToCelsius
from granule_ingester.processors.TileProcessor import TileProcessor
from granule_ingester.processors.TileSummarizingProcessor import TileSummarizingProcessor
-from granule_ingester.processors.kelvintocelsius import KelvinToCelsius
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
index 1876013..7c81e83 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
@@ -23,6 +23,18 @@
self.time = time
self.tile = tile
+ @staticmethod
+ def bid(dataset, variable, lat, lon, time):
+ bid = 0
+ if lat == 'YC' and lon == 'XC':
+ bid += 1
+ if lat not in dataset[variable].dims and lon not in dataset[variable].dims:
+ bid += 1
+ if 'tile' in dataset[variable].dims:
+ bid += 1
+
+ return bid / 3
+
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
new_tile = nexusproto.EccoTile()
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
index 4354f9e..f1bc309 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
@@ -14,6 +14,17 @@
self.depth = depth
self.time = time
+ @staticmethod
+ def bid(dataset, variable, lat, lon, time):
+ bid = 0
+ if all(dimension_size > 2 for dimension_size in dataset[variable].sizes.values()):
+ bid += 1
+ if len(dataset[lat].dims) == 1 and len(dataset[lon].dims) == 1:
+ bid += 1
+ if len(set(dataset[variable].dims) - {time}) >= 2:
+ bid += 1
+ return bid / 3
+
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
new_tile = nexusproto.GridTile()
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
index fec28ca..fdc58f0 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
@@ -14,6 +14,14 @@
self.depth = depth
self.time = time
+ @staticmethod
+ def bid(dataset, variable, lat, lon, time):
+ bid = 0
+ if 2 in dataset[variable].sizes.values():
+ bid += 1
+
+ return bid / 1
+
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
new_tile = nexusproto.SwathTile()
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 8b69ad2..4192c36 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -32,6 +32,11 @@
self.latitude = latitude
self.longitude = longitude
+ @staticmethod
+ @abstractmethod
+ def bid(dataset: xr.Dataset) -> bool:
+ pass
+
def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
try:
dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
index 2831c0c..c10586e 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
@@ -15,6 +15,16 @@
self.depth = depth
self.time = time
+ @staticmethod
+ def bid(dataset, variable, lat, lon, time):
+ bid = 0
+ if len(dataset[variable].dims) == 2:
+ bid += 1
+ if time in dataset[variable].dims:
+ bid += 1
+
+ return bid / 2
+
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
new_tile = nexusproto.TimeSeriesTile()
diff --git a/granule_ingester/tests/processors/test_ReadingProcessorSelector.py b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py
new file mode 100644
index 0000000..c607071
--- /dev/null
+++ b/granule_ingester/tests/processors/test_ReadingProcessorSelector.py
@@ -0,0 +1,57 @@
+
+import unittest
+from os import path
+
+
+import xarray as xr
+
+
+from granule_ingester.processors import ReadingProcessorSelector
+from granule_ingester.processors.reading_processors import GridReadingProcessor, EccoReadingProcessor, TimeSeriesReadingProcessor, SwathReadingProcessor
+
+
+from granule_ingester.processors.ReadingProcessorSelector import GRID_PROCESSORS
+
+
+class TestGenerateTileId(unittest.TestCase):
+
+ def test_detect_dimensions(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ selector = ReadingProcessorSelector(dataset, 'smap_sss')
+ self.assertEqual(('lat', 'lon', 'row_time'), selector.detect_dimensions())
+
+ def test_detect_grid_type_smap(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ selector = ReadingProcessorSelector(dataset, 'smap_sss')
+ processor = selector.detect_grid_type('lat', 'lon', 'time', GRID_PROCESSORS)
+ self.assertEqual(GridReadingProcessor, processor)
+
+ def test_detect_grid_type_ecco_native(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ selector = ReadingProcessorSelector(dataset, 'OBP')
+ processor = selector.detect_grid_type('YC', 'XC', 'time', GRID_PROCESSORS)
+ self.assertEqual(EccoReadingProcessor, processor)
+
+ def test_detect_grid_type_ecco_interp(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/OBP_2017_01.nc')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ selector = ReadingProcessorSelector(dataset, 'OBP')
+ processor = selector.detect_grid_type('latitude', 'longitude', 'time', GRID_PROCESSORS)
+ self.assertEqual(GridReadingProcessor, processor)
+
+ def test_detect_grid_type_time_series(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_wswm.nc')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ selector = ReadingProcessorSelector(dataset, 'Qout')
+ processor = selector.detect_grid_type('lat', 'lon', 'time', GRID_PROCESSORS)
+ self.assertEqual(TimeSeriesReadingProcessor, processor)
+
+ def test_detect_grid_type_swatch(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_smap.h5')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ selector = ReadingProcessorSelector(dataset, 'smap_sss')
+ processor = selector.detect_grid_type('lat', 'lon', 'row_time', GRID_PROCESSORS)
+ self.assertEqual(SwathReadingProcessor, processor)
diff --git a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
index f2e9f29..ec3311f 100644
--- a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
@@ -62,3 +62,14 @@
self.assertEqual(output_tile.tile.ecco_tile.variable_data.shape, [15, 7])
self.assertEqual(output_tile.tile.ecco_tile.latitude.shape, [15, 7])
self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7])
+
+ def test_bid(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ bid = EccoReadingProcessor.bid(
+ dataset=dataset,
+ variable='OBP',
+ lat='YC',
+ lon='XC',
+ time='time')
+ self.assertEqual(3, bid)
diff --git a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
index 55ac4fc..c9e76c3 100644
--- a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
@@ -72,3 +72,14 @@
self.assertEqual([38, 1], output_tile.tile.swath_tile.variable_data.shape)
self.assertEqual([38, 1], output_tile.tile.swath_tile.latitude.shape)
self.assertEqual([38, 1], output_tile.tile.swath_tile.longitude.shape)
+
+ def test_bid(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/not_empty_ascatb.nc4')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ bid = SwathReadingProcessor.bid(
+ dataset=dataset,
+ variable='wind_speed',
+ lat='lat',
+ lon='lon',
+ time='time')
+ self.assertTrue(bid)