blob: c1bac318219741e4514560715cb0035364cf9581 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
datasets.py -- Routines for dataset-specfic capabilities: file handling, readers, etc.
One Class for each dataset containing static methods and constants/templates, etc.
'''
import sys, os, re, datetime
import numpy as np
from split import splitByNDaysKeyed, groupByKeys, extractKeys
def splitModisAod(seq, n):
return splitByNDaysKeyed(seq, n, re.compile(r'(....)(..)(..)'), lambda y, m, d: ymd2doy(y, m, d))
def splitAvhrrSst(seq, n):
return splitByNDays_Avhrr(seq, n, re.compile(r'^(....)(..)(..)'))
class ModisSst:
ExpectedRunTime = "28m"
UrlsPath = "/data/share/datasets/MODIS_L3_AQUA_11UM_V2014.0_4KM_DAILY/daily_data/A*SST*.nc"
ExampleFileName = 'A2010303.L3m_DAY_NSST_sst_4km.nc'
GetKeysRegex = r'A(....)(...).L3m_DAY_(.)S'
VariableName = 'sst'
Mask = None
Coordinates = ['lat', 'lon']
OutputClimTemplate = ''
@staticmethod
def keysTransformer(s): return (s[1], s[0], s[2]) # DOY, YEAR, N=night / S=day
@staticmethod
def getKeys(url):
return extractKeys(url, ModisSst.GetKeysRegex, ModisSst.keysTransformer)
@staticmethod
def split(seq, n):
return [u for u in splitByNDaysKeyed(seq, n, ModisSst.GetKeysRegex, ModisSst.keysTransformer)]
@staticmethod
def genOutputName(doy, variable, nEpochs, averagingConfig):
return 'A%03d.L3m_%s_%dday_clim_%s.nc' % (
doy, variable, nEpochs, averagingConfig['name']) # mark each file with first day in period
class ModisChlor:
ExpectedRunTime = "11m"
UrlsPath = "/Users/greguska/githubprojects/nexus/nexus-ingest/developer-box/data/modis_aqua_chl/A*chlor*.nc"
ExampleFileName = "A2013187.L3m_DAY_CHL_chlor_a_4km.nc"
GetKeysRegex = r'A(....)(...).L3m.*CHL'
Variable = 'chlor_a'
Mask = None
Coordinates = ['lat', 'lon']
OutputClimTemplate = ''
@staticmethod
def keysTransformer(s): return (s[1], s[0]) # DOY, YEAR
@staticmethod
def getKeys(url):
return extractKeys(url, ModisChlor.GetKeysRegex, ModisChlor.keysTransformer)
@staticmethod
def split(seq, n):
return [u for u in splitByNDaysKeyed(seq, n, ModisChlor.GetKeysRegex, ModisChlor.keysTransformer)]
@staticmethod
def genOutputName(doy, variable, nEpochs, averagingConfig):
return 'A%03d.L3m_%s_%dday_clim_%s.nc' % (
doy, variable, nEpochs, averagingConfig['name']) # mark each file with first day in period
class MeasuresSsh:
ExpectedRunTime = "2m22s"
UrlsPath = "/data/share/datasets/MEASURES_SLA_JPL_1603/daily_data/ssh_grids_v1609*12.nc"
ExampleFileName = "ssh_grids_v1609_2006120812.nc"
GetKeysRegex = r'ssh.*v1609_(....)(..)(..)12.nc'
Variable = 'SLA' # sea level anomaly estimate
Mask = None
Coordinates = ['Longitude', 'Latitude'] # Time is first (len=1) coordinate, will be removed
OutputClimTemplate = ''
@staticmethod
def keysTransformer(s): return (ymd2doy(s[0], s[1], s[2]), s[0]) # DOY, YEAR
@staticmethod
def getKeys(url):
return extractKeys(url, MeasuresSsh.GetKeysRegex, MeasuresSsh.keysTransformer)
@staticmethod
def split(seq, n):
return [u for u in splitByNDaysKeyed(seq, n, MeasuresSsh.GetKeysRegex, MeasuresSsh.keysTransformer)]
@staticmethod
def genOutputName(doy, variable, nEpochs, averagingConfig):
return "ssh_grids_v1609_%03d_%dday_clim_%s.nc" % (int(doy), nEpochs, averagingConfig['name'])
class CCMPWind:
ExpectedRunTime = "?"
UrlsPath = "/data/share/datasets/CCMP_V2.0_L3.0/daily_data/CCMP_Wind*_V02.0_L3.0_RSS_uncompressed.nc"
ExampleFileName = "CCMP_Wind_Analysis_20160522_V02.0_L3.0_RSS_uncompressed.nc"
GetKeysRegex = r'CCMP_Wind_Analysis_(....)(..)(..)_V.*.nc'
Variable = 'Wind_Magnitude' # to be computed as sqrt(uwnd^2 + vwnd^2)
Mask = None
Coordinates = ['latitude', 'longitude']
OutputClimTemplate = ''
@staticmethod
def keysTransformer(s):
return (ymd2doy(s[0], s[1], s[2]), s[0]) # DOY, YEAR
@staticmethod
def getKeys(url):
return extractKeys(url, CCMPWind.GetKeysRegex, CCMPWind.keysTransformer)
@staticmethod
def split(seq, n):
return [u for u in splitByNDaysKeyed(seq, n, CCMPWind.GetKeysRegex, CCMPWind.keysTransformer)]
@staticmethod
def genOutputName(doy, variable, nEpochs, averagingConfig):
return "CCMP_Wind_Analysis_V02.0_L3.0_RSS_%03d_%dday_clim_%s.nc" % (int(doy), nEpochs, averagingConfig['name'])
@staticmethod
def readAndMask(url, variable, mask=None, cachePath='/tmp/cache', hdfsPath=None):
"""
Read a variable from a netCDF or HDF file and return a numpy masked array.
If the URL is remote or HDFS, first retrieve the file into a cache directory.
"""
from variables import getVariables, close
v = None
if mask:
variables = [variable, mask]
else:
variables = [variable]
try:
from cache import retrieveFile
path = retrieveFile(url, cachePath, hdfsPath)
except:
print >> sys.stderr, 'readAndMask: Error, continuing without file %s' % url
return v
if CCMPWind.Variable in variables:
var, fh = getVariables(path, ['uwnd','vwnd'], arrayOnly=True,
set_auto_mask=True) # return dict of variable objects by name
uwnd_avg = np.average(var['uwnd'], axis=0)
vwnd_avg = np.average(var['vwnd'], axis=0)
wind_magnitude = np.sqrt(np.add(np.multiply(uwnd_avg, uwnd_avg), np.multiply(vwnd_avg, vwnd_avg)))
v = wind_magnitude
if v.shape[0] == 1: v = v[0] # throw away trivial time dimension for CF-style files
close(fh)
else:
try:
print >> sys.stderr, 'Reading variable %s from %s' % (variable, path)
var, fh = getVariables(path, variables, arrayOnly=True,
set_auto_mask=True) # return dict of variable objects by name
v = var[
variable] # could be masked array
if v.shape[0] == 1: v = v[0] # throw away trivial time dimension for CF-style files
close(fh)
except:
print >> sys.stderr, 'readAndMask: Error, cannot read variable %s from file %s' % (variable, path)
return v
class MonthlyClimDataset:
ExpectedRunTime = "2m"
UrlsPath = ''
ExampleFileName = ''
GetKeysRegex = r'(YYYY)(MM)(DD)' # Regex to extract year, month, day
Variable = 'var' # Variable name in granule
Mask = None
Coordinates = ['lat', 'lon']
OutputClimTemplate = ''
@staticmethod
def keysTransformer(s):
return (s[1],s[0]) # MONTH, YEAR
@staticmethod
def getKeys(url):
return extractKeys(url, MonthlyClimDataset.GetKeysRegex,
MonthlyClimDataset.keysTransformer)
@staticmethod
def split(seq, n):
return [u for u in splitByNDaysKeyed(seq, n,
MonthlyClimDataset.GetKeysRegex,
MonthlyClimDataset.keysTransformer)]
@staticmethod
def genOutputName(month, variable, nEpochs, averagingConfig):
# Here we use the 15th of the month to get DOY and just use any
# non-leap year.
doy = datetime2doy(ymd2datetime(2017, month, 15))
return 'monthly_clim_%s_%03d_month%02d_nepochs%d_%s.nc' % (
variable, doy, month, nEpochs,
averagingConfig['name']) # mark each file with month
class SMAP_L3M_SSS(MonthlyClimDataset):
UrlsPath = "/data/share/datasets/SMAP_L3_SSS/monthly/RSS_smap_SSS_monthly_*.nc"
ExampleFileName = 'RSS_smap_SSS_monthly_2015_04_v02.0.nc'
GetKeysRegex = r'RSS_smap_SSS_monthly_(....)_(..)_v02'
Variable = 'sss_smap'
@staticmethod
def getKeys(url):
return extractKeys(url, SMAP_L3M_SSS.GetKeysRegex,
SMAP_L3M_SSS.keysTransformer)
@staticmethod
def split(seq, n):
return [u for u in splitByNDaysKeyed(seq, n,
SMAP_L3M_SSS.GetKeysRegex,
SMAP_L3M_SSS.keysTransformer)]
@staticmethod
def genOutputName(month, variable, nEpochs, averagingConfig):
# Here we use the 15th of the month to get DOY and just use any
# non-leap year.
doy = datetime2doy(ymd2datetime(2017, month, 15))
return '%s_L3m_clim_doy%03d_month%02d_nepochs%d_%s.nc' % (
variable, doy, month, nEpochs,
averagingConfig['name']) # mark each file with month
class GRACE_Tellus(MonthlyClimDataset):
GetKeysRegex = r'GRCTellus.JPL.(....)(..)(..).GLO'
Variable = 'lwe_thickness' # Liquid_Water_Equivalent_Thickness
@staticmethod
def getKeys(url):
return extractKeys(url, GRACE_Tellus.GetKeysRegex,
GRACE_Tellus.keysTransformer)
@staticmethod
def split(seq, n):
return [u for u in splitByNDaysKeyed(seq, n,
GRACE_Tellus.GetKeysRegex,
GRACE_Tellus.keysTransformer)]
@staticmethod
def genOutputName(month, variable, nEpochs, averagingConfig):
# Here we use the 15th of the month to get DOY and just use any
# non-leap year.
doy = datetime2doy(ymd2datetime(2017, month, 15))
return 'GRACE_Tellus_monthly_%s_%03d_month%02d_nepochs%d_%s.nc' % (
variable, doy, month, nEpochs,
averagingConfig['name']) # mark each file with month
class GRACE_Tellus_monthly_land(GRACE_Tellus):
UrlsPath = "/data/share/datasets/GRACE_Tellus/monthly_land/GRCTellus.JPL.*.nc"
ExampleFileName = "GRCTellus.JPL.20150122.GLO.RL05M_1.MSCNv02CRIv02.land.nc"
@staticmethod
def genOutputName(month, variable, nEpochs, averagingConfig):
# Here we use the 15th of the month to get DOY and just use any
# non-leap year.
doy = datetime2doy(ymd2datetime(2017, month, 15))
return 'GRACE_Tellus_monthly_land_%s_%03d_month%02d_nepochs%d_%s.nc' % (
variable, doy, month, nEpochs,
averagingConfig['name']) # mark each file with month
class GRACE_Tellus_monthly_ocean(GRACE_Tellus):
UrlsPath = "/data/share/datasets/GRACE_Tellus/monthly_ocean/GRCTellus.JPL.*.nc"
ExampleFileName = "GRCTellus.JPL.20150122.GLO.RL05M_1.MSCNv02CRIv02.ocean.nc"
@staticmethod
def genOutputName(month, variable, nEpochs, averagingConfig):
# Here we use the 15th of the month to get DOY and just use any
# non-leap year.
doy = datetime2doy(ymd2datetime(2017, month, 15))
return 'GRACE_Tellus_monthly_ocean_%s_%03d_month%02d_nepochs%d_%s.nc'%(
variable, doy, month, nEpochs,
averagingConfig['name']) # mark each file with month
DatasetList = {'ModisSst': ModisSst, 'ModisChlor': ModisChlor,
'MeasuresSsh': MeasuresSsh, 'CCMPWind': CCMPWind,
'SMAP_L3M_SSS': SMAP_L3M_SSS,
'GRACE_Tellus_monthly_ocean': GRACE_Tellus_monthly_ocean,
'GRACE_Tellus_monthly_land': GRACE_Tellus_monthly_land}
# Utils follow.
def ymd2doy(year, mon, day):
return datetime2doy(ymd2datetime(year, mon, day))
def ymd2datetime(y, m, d):
y, m, d = map(int, (y, m, d))
return datetime.datetime(y, m, d)
def datetime2doy(dt):
return int(dt.strftime('%j'))
def doy2datetime(year, doy):
'''Convert year and DOY (day of year) to datetime object.'''
return datetime.datetime(int(year), 1, 1) + datetime.timedelta(int(doy) - 1)
def doy2month(year, doy): return doy2datetime(year, doy).strftime('%m')