blob: a483907f6304d37fc6073a6cb9b73bf994ec0757 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from collections import OrderedDict
from contextlib import contextmanager
from os import sep, path, remove
from urllib.request import urlopen
from nexusproto import DataTile_pb2 as nexusproto
import numpy
from netCDF4 import Dataset, num2date
from nexusproto.serialization import to_shaped_array, to_metadata
from pytz import timezone
from sdap.processors import NexusTileProcessor
EPOCH = timezone('UTC').localize(datetime.datetime(1970, 1, 1))
@contextmanager
def closing(thing):
try:
yield thing
finally:
thing.close()
def parse_input(the_input_tile, temp_dir):
specs = [the_input_tile.summary.section_spec]
# Generate a list of tuples, where each tuple is a (string, map) that represents a
# tile spec in the form (str(section_spec), { dimension_name : slice, dimension2_name : slice })
tile_specifications = [slices_from_spec(section_spec) for section_spec in specs]
file_path = the_input_tile.summary.granule
file_name = file_path.split(sep)[-1]
# If given a temporary directory location, copy the file to the temporary directory and return that path
if temp_dir is not None:
temp_file_path = path.join(temp_dir, file_name)
with closing(urlopen(file_path)) as original_granule:
with open(temp_file_path, 'wb') as temp_granule:
for chunk in iter((lambda: original_granule.read(512000)), ''):
temp_granule.write(chunk)
file_path = temp_file_path
# Remove file:// if it's there because netcdf lib doesn't like it
file_path = file_path[len('file:'):] if file_path.startswith('file:') else file_path
return tile_specifications, file_path
def slices_from_spec(spec):
dimtoslice = {}
for dimension in spec.split(','):
name, start, stop = dimension.split(':')
dimtoslice[name] = slice(int(start), int(stop))
return spec, dimtoslice
def to_seconds_from_epoch(date, timeunits=None, start_day=None, timeoffset=None):
try:
date = num2date(date, units=timeunits)
except ValueError:
assert isinstance(start_day, datetime.date), "start_day is not a datetime.date object"
the_datetime = datetime.datetime.combine(start_day, datetime.datetime.min.time())
date = the_datetime + datetime.timedelta(seconds=date)
if isinstance(date, datetime.datetime):
date = timezone('UTC').localize(date)
else:
date = timezone('UTC').localize(datetime.datetime.strptime(str(date), '%Y-%m-%d %H:%M:%S'))
if timeoffset is not None:
return int((date - EPOCH).total_seconds()) + timeoffset
else:
return int((date - EPOCH).total_seconds())
def get_ordered_slices(ds, variable, dimension_to_slice):
dimensions_for_variable = [str(dimension) for dimension in ds[variable].dimensions]
ordered_slices = OrderedDict()
for dimension in dimensions_for_variable:
ordered_slices[dimension] = dimension_to_slice[dimension]
return ordered_slices
class TileReadingProcessor(NexusTileProcessor):
def __init__(self, variable_to_read, latitude, longitude, *args, **kwargs):
super().__init__(*args, **kwargs)
# Required properties for all reader types
self.variable_to_read = variable_to_read
self.latitude = latitude
self.longitude = longitude
# Common optional properties
self.temp_dir = self.environ['TEMP_DIR']
self.metadata = self.environ['META']
self.start_of_day = self.environ['GLBLATTR_DAY']
self.start_of_day_pattern = self.environ['GLBLATTR_DAY_FORMAT']
self.time_offset = int(self.environ['TIME_OFFSET']) if self.environ['TIME_OFFSET'] is not None else None
def process_nexus_tile(self, input_tile):
tile_specifications, file_path = parse_input(input_tile, self.temp_dir)
output_tile = nexusproto.NexusTile()
output_tile.CopyFrom(input_tile)
for tile in self.read_data(tile_specifications, file_path, output_tile):
yield tile
# If temp dir is defined, delete the temporary file
if self.temp_dir is not None:
remove(file_path)
def read_data(self, tile_specifications, file_path, output_tile):
raise NotImplementedError
class GridReadingProcessor(TileReadingProcessor):
def read_data(self, tile_specifications, file_path, output_tile):
# Time is optional for Grid data
time = self.environ['TIME']
with Dataset(file_path) as ds:
for section_spec, dimtoslice in tile_specifications:
tile = nexusproto.GridTile()
tile.latitude.CopyFrom(
to_shaped_array(numpy.ma.filled(ds[self.latitude][dimtoslice[self.latitude]], numpy.NaN)))
tile.longitude.CopyFrom(
to_shaped_array(numpy.ma.filled(ds[self.longitude][dimtoslice[self.longitude]], numpy.NaN)))
# Before we read the data we need to make sure the dimensions are in the proper order so we don't have any
# indexing issues
ordered_slices = get_ordered_slices(ds, self.variable_to_read, dimtoslice)
# Read data using the ordered slices, replacing masked values with NaN
data_array = numpy.ma.filled(ds[self.variable_to_read][tuple(ordered_slices.values())], numpy.NaN)
tile.variable_data.CopyFrom(to_shaped_array(data_array))
if self.metadata is not None:
tile.meta_data.add().CopyFrom(
to_metadata(self.metadata, ds[self.metadata][tuple(ordered_slices.values())]))
if time is not None:
timevar = ds[time]
# Note assumption is that index of time is start value in dimtoslice
tile.time = to_seconds_from_epoch(timevar[dimtoslice[time].start],
timeunits=timevar.getncattr('units'),
timeoffset=self.time_offset)
output_tile.tile.grid_tile.CopyFrom(tile)
yield output_tile
class SwathReadingProcessor(TileReadingProcessor):
def __init__(self, variable_to_read, latitude, longitude, time, **kwargs):
super().__init__(variable_to_read, latitude, longitude, **kwargs)
# Time is required for swath data
self.time = time
def read_data(self, tile_specifications, file_path, output_tile):
with Dataset(file_path) as ds:
for section_spec, dimtoslice in tile_specifications:
tile = nexusproto.SwathTile()
# Time Lat Long Data and metadata should all be indexed by the same dimensions, order the incoming spec once using the data variable
ordered_slices = get_ordered_slices(ds, self.variable_to_read, dimtoslice)
tile.latitude.CopyFrom(
to_shaped_array(numpy.ma.filled(ds[self.latitude][tuple(ordered_slices.values())], numpy.NaN)))
tile.longitude.CopyFrom(
to_shaped_array(numpy.ma.filled(ds[self.longitude][tuple(ordered_slices.values())], numpy.NaN)))
timetile = ds[self.time][
tuple([ordered_slices[time_dim] for time_dim in ds[self.time].dimensions])].astype(
'float64',
casting='same_kind',
copy=False)
timeunits = ds[self.time].getncattr('units')
try:
start_of_day_date = datetime.datetime.strptime(ds.getncattr(self.start_of_day),
self.start_of_day_pattern)
except Exception:
start_of_day_date = None
for index in numpy.ndindex(timetile.shape):
timetile[index] = to_seconds_from_epoch(timetile[index].item(), timeunits=timeunits,
start_day=start_of_day_date, timeoffset=self.time_offset)
tile.time.CopyFrom(to_shaped_array(timetile))
# Read the data converting masked values to NaN
data_array = numpy.ma.filled(ds[self.variable_to_read][tuple(ordered_slices.values())], numpy.NaN)
tile.variable_data.CopyFrom(to_shaped_array(data_array))
if self.metadata is not None:
tile.meta_data.add().CopyFrom(
to_metadata(self.metadata, ds[self.metadata][tuple(ordered_slices.values())]))
output_tile.tile.swath_tile.CopyFrom(tile)
yield output_tile
class TimeSeriesReadingProcessor(TileReadingProcessor):
def __init__(self, variable_to_read, latitude, longitude, time, **kwargs):
super().__init__(variable_to_read, latitude, longitude, **kwargs)
# Time is required for swath data
self.time = time
def read_data(self, tile_specifications, file_path, output_tile):
with Dataset(file_path) as ds:
for section_spec, dimtoslice in tile_specifications:
tile = nexusproto.TimeSeriesTile()
instance_dimension = next(
iter([dim for dim in ds[self.variable_to_read].dimensions if dim != self.time]))
tile.latitude.CopyFrom(
to_shaped_array(numpy.ma.filled(ds[self.latitude][dimtoslice[instance_dimension]], numpy.NaN)))
tile.longitude.CopyFrom(
to_shaped_array(numpy.ma.filled(ds[self.longitude][dimtoslice[instance_dimension]], numpy.NaN)))
# Before we read the data we need to make sure the dimensions are in the proper order so we don't
# have any indexing issues
ordered_slices = get_ordered_slices(ds, self.variable_to_read, dimtoslice)
# Read data using the ordered slices, replacing masked values with NaN
data_array = numpy.ma.filled(ds[self.variable_to_read][tuple(ordered_slices.values())], numpy.NaN)
tile.variable_data.CopyFrom(to_shaped_array(data_array))
if self.metadata is not None:
tile.meta_data.add().CopyFrom(
to_metadata(self.metadata, ds[self.metadata][tuple(ordered_slices.values())]))
tile.time.CopyFrom(
to_shaped_array(numpy.ma.filled(ds[self.time][dimtoslice[self.time]], numpy.NaN)))
output_tile.tile.time_series_tile.CopyFrom(tile)
yield output_tile