blob: 859b52807260d32494241591970d4c3be0f7f102 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
climatology3Spark.py
Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
Simple code to be run on Spark cluster, or using multi-core parallelism on single machine.
"""
import sys, os, urllib.parse, urllib.request, urllib.parse, urllib.error, re, time
import numpy as N
import matplotlib
matplotlib.use('Agg')
import matplotlib.pylab as M
from .variables import getVariables, close
from .cache import retrieveFile, CachePath
from .split import fixedSplit, splitByNDays
from netCDF4 import Dataset, default_fillvals
from .plotlib import imageMap, makeMovie
from .spatialFilter import spatialFilter
from .gaussInterp import gaussInterp # calls into Fortran version gaussInterp_f.so
#from gaussInterp_slow import gaussInterp_slow as gaussInterp # pure python, slow debuggable version
VERBOSE = 1
# Possible execution modes
ExecutionModes = ['multicore', 'spark']
# SST L3m 4.6km Metadata
# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320
# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat']
# Generate algorithmic name for N-day Climatology product
SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc' #??
# Simple mask and average functions to get us started, then add gaussian interpolation.
# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage
#def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask == 0))
def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
#def qcMask(var, mask): return N.ma.masked_where(mask < 0, var)
def splitModisSst(seq, n):
for chunk in splitByNDays(seq, n, re.compile(r'(...).L3m')):
yield chunk
AveragingFunctions = {'pixelMean': mean, 'gaussInterp': gaussInterp, 'spatialFilter': spatialFilter}
PixelMeanConfig = {'name': 'pixelMean'}
GaussInterpConfig = {'name': 'gaussInterp',
'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid
'wlat': 3, 'wlon': 3,
'slat': 0.15, 'slon': 0.15, 'stime': 1,
'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
GaussInterpConfig1a = {'name': 'gaussInterp',
'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid
'wlat': 0.30, 'wlon': 0.30,
'slat': 0.15, 'slon': 0.15, 'stime': 1,
'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
GaussInterpConfig1b = {'name': 'gaussInterp',
'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid
'wlat': 0.08, 'wlon': 0.08,
'slat': 0.15, 'slon': 0.15, 'stime': 1,
'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
GaussInterpConfig2 = {'name': 'gaussInterp',
'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 0.25),
'wlat': 2., 'wlon': 2.,
'slat': 0.15, 'slon': 0.15, 'stime': 1,
'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]] # divide by 16
FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]] # divide by 9
SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16.,
'spatialFilter': N.array(FilterGaussian, dtype=N.int32),
'missingValue': default_fillvals['f4']}
SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9.,
'spatialFilter': N.array(FilterLowPass, dtype=N.int32),
'missingValue': default_fillvals['f4']}
# Directory to cache retrieved files in
CachePath = '~/cache'
def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year)
nEpochs, # compute a climatology for every N epochs (days) by 'averaging'
nWindow, # number of epochs in window needed for averaging
variable, # name of primary variable in file
mask, # name of mask variable
coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
splitFn=splitModisSst, # split function to use to partition the input URL list
maskFn=qcMask, # mask function to compute mask from mask variable
averager='pixelMean', # averaging function to use, one of ['pixelMean', 'gaussInterp', 'spatialFilter']
averagingConfig={}, # dict of parameters to control the averaging function (e.g. gaussInterp)
optimization='fortran', # optimization mode (fortran or cython)
mode='multicore', # Map across time periods of N-days for concurrent work, executed by:
# 'multicore' using pysparkling or 'spark' using Spark/Mesos cluster
numNodes=1, # number of cluster nodes to use
nWorkers=4, # number of parallel workers per node
averagingFunctions=AveragingFunctions, # dict of possible averaging functions
legalModes=ExecutionModes, # list of possible execution modes
cachePath=CachePath # directory to cache retrieved files in
):
'''Compute a climatology every N days by applying a mask and averaging function.
Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
***Assumption: This routine assumes that the N grids will fit in memory.***
'''
if averagingConfig['name'] == 'gaussInterp':
averagingConfig['wlat'] = nNeighbors
averagingConfig['wlon'] = nNeighbors
try:
averageFn = averagingFunctions[averager]
except:
print('climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions), file=sys.stderr)
sys.exit(1)
urlSplits = [s for s in splitFn(urls, nEpochs)]
if mode == 'multicore':
pass
elif mode == 'spark':
pass
def climsContoured(urls, plot=None, fillValue=default_fillvals['f4'], format='NETCDF4', cachePath=cachePath):
n = len(urls)
if VERBOSE: print(urls, file=sys.stderr)
var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn, averagingConfig, optimization, cachePath)
fn = os.path.split(urls[0])[1]
inFile = os.path.join(cachePath, fn)
method = averagingConfig['name']
fn = os.path.splitext(fn)[0]
day = fn[5:8]
nDays = int(var['time'][0])
if 'wlat' in averagingConfig:
wlat = averagingConfig['wlat']
else:
wlat = 1
if int(wlat) == wlat:
outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%dnbrs.nc' % (day, nDays, method, int(wlat)) # mark each file with first day in period
else:
outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%4.2fnbrs.nc' % (day, nDays, method, wlat) # mark each file with first day in period
outFile = writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue, format)
if plot == 'contour':
figFile = contourMap(var, variable, coordinates, n, outFile)
elif plot == 'histogram':
# figFile = histogram(var, variable, n, outFile)
figFile = None
else:
figFile = None
return (outFile, figFile)
return results
def accumulateClim(urls, # list of granule URLs for a time period
variable, # name of primary variable in file
mask, # name of mask variable
coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
maskFn=qcMask, # mask function to compute mask from mask variable
averageFn=mean, # averaging function to use
averagingConfig={}, # parameters to control averaging function (e.g. gaussInterp)
optimization='fortran', # optimization mode (fortran or cython)
cachePath=CachePath
):
'''Compute a climatology over N arrays by applying a mask and averaging function.
Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
***Assumption: This routine assumes that the N grids will fit in memory.***
'''
print('accumulateClim: Doing %s ...' % averagingConfig['name'], file=sys.stderr)
varList = [variable, mask]
for i, url in enumerate(urls):
try:
path = retrieveFile(url, cachePath)
fn = os.path.split(path)[1]
vtime = int(fn[5:8]) # KLUDGE: extract DOY from filename
except:
print('climByAveraging: Error, continuing without file %s' % url, file=sys.stderr)
continue
if path is None: continue
try:
print('Reading file %s ...' % path, file=sys.stderr)
var, fh = getVariables(path, varList, arrayOnly=True, order='F', set_auto_mask=False) # return dict of variable objects by name
except:
print('climByAveraging: Error, cannot read file %s' % path, file=sys.stderr)
continue
if i == 0:
dtype = var[variable].dtype
if 'int' in dtype.name: dtype = N.float32
shape = var[variable].shape
vsum = N.ma.empty(shape, dtype, order='F')
vcount = N.ma.empty(shape, dtype, order='F')
emptyVar = N.array(N.ma.masked_all(var[variable].shape, dtype), order='F') # totally masked variable array for missing or bad file reads
print('Read coordinates ...', file=sys.stderr)
var, fh = getVariables(path, coordinates, var, arrayOnly=True, order='F') # read coordinate arrays and add to dict
var[variable] = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA, turned off masking done by netCDF4 library
# Echo variable range for sanity check
vals = var[variable].compressed()
print('Variable Range: min, max:', vals.min(), vals.max(), file=sys.stderr)
if averagingConfig['name'] == 'pixelMean':
vsum += var[variable] # update accumulators
vcount += ~var[mask]
elif averagingConfig['name'] == 'gaussInterp':
var[variable] = accum
c = averagingConfig
latGrid = c['latGrid']; lonGrid = c['lonGrid']
if latGrid is not None and lonGrid is not None:
outlat = N.arange(latGrid[0], latGrid[1]+latGrid[2], latGrid[2], dtype=N.float32, order='F')
outlon = N.arange(lonGrid[0], lonGrid[1]+lonGrid[2], lonGrid[2], dtype=N.float32, order='F')
else:
outlat = N.array(var[coordinates[1]], dtype=N.float32, order='F')
outlon = N.array(var[coordinates[2]], dtype=N.float32, order='F')
varNames = [variable] + coordinates
start = time.time()
avg, weight, status = \
gaussInterp(var, varNames, outlat, outlon, c['wlat'], c['wlon'],
c['slat'], c['slon'], c['stime'], c['vfactor'], c['missingValue'],
VERBOSE, optimization)
end = time.time()
vcount = weight.astype(N.float32)
vsum = avg
print('gaussInterp execution time:', (end - start), file=sys.stderr)
elif averagingConfig['name'] == 'spatialFilter':
var[variable] = accum
c = averagingConfig
varNames = [variable] + coordinates
start = time.time()
avg, vcount, status = \
spatialFilter(var, varNames, c['spatialFilter'], c['normalization'],
c['missingValue'], VERBOSE, optimization)
vsum = avg
end = time.time()
print('spatialFilter execution time:', (end - start), file=sys.stderr)
close(fh)
return (vcount, vsum)
def writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue=None, format='NETCDF4'):
'''Construct output bundle of arrays with NetCDF dimensions and attributes.
Output variables and attributes will have same names as the input file.
'''
din = Dataset(inFile, 'r')
dout = Dataset(outFile, 'w', format=format)
print('Writing %s ...' % outFile, file=sys.stderr)
# Transfer global attributes from input file
for a in din.ncattrs():
dout.setncattr(a, din.getncattr(a))
# Add dimensions and variables, copying data
coordDim = [dout.createDimension(coord, var['out'+coord].shape[0]) for coord in coordinates] # here output lon, lat, etc.
for coord in coordinates:
v = dout.createVariable(coord, var['out'+coord].dtype, (coord,))
v[:] = var['out'+coord][:]
primaryVar = dout.createVariable(variable, var['out'+variable].dtype, coordinates, fill_value=fillValue)
primaryVar[:] = var['out'+variable][:] # transfer array
maskVar = dout.createVariable(mask, 'i1', coordinates)
maskVar[:] = var['out'+mask].astype('i1')[:]
# Transfer variable attributes from input file
for k,v in dout.variables.items():
for a in din.variables[k].ncattrs():
if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': continue
v.setncattr(a, din.variables[k].getncattr(a))
if k == variable:
try:
# if fillValue == None: fillValue = din.variables[k].getncattr('_FillValue') # total kludge
if fillValue == None: fillValue = default_fillvals['f4']
# print >>sys.stderr, default_fillvals
# v.setncattr('_FillValue', fillValue) # set proper _FillValue for climatology array
v.setncattr('missing_value', fillValue)
print('Setting missing_value for primary variable %s to %f' % (variable, fillValue), file=sys.stderr)
except:
print('writeOutNetcdfVars: Warning, for variable %s no fill value specified or derivable from inputs.' % variable, file=sys.stderr)
din.close()
dout.close()
return outFile
def contourMap(var, variable, coordinates, n, outFile):
figFile = os.path.splitext(outFile)[0] + '_hist.png'
# TODO: Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays
vals = var[variable][:]
# Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing??
imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
vmin=-2., vmax=45., outFile=figFile, autoBorders=False,
title='%s %d-day Mean from %s' % (variable.upper(), n, outFile))
print('Writing contour plot to %s' % figFile, file=sys.stderr)
return figFile
def histogram(vals, variable, n, outFile):
figFile = os.path.splitext(outFile)[0] + '_hist.png'
M.clf()
# M.hist(vals, 47, (-2., 45.))
M.hist(vals, 94)
M.xlim(-5, 45)
M.xlabel('SST in degrees Celsius')
M.ylim(0, 300000)
M.ylabel('Count')
M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, outFile))
M.show()
print('Writing histogram plot to %s' % figFile, file=sys.stderr)
M.savefig(figFile)
return figFile
def dailyFile2date(path, offset=1):
'''Convert YYYYDOY string in filename to date.'''
fn = os.path.split(path)[1]
year = int(fn[offset:offset+4])
doy = int(fn[offset+5:offset+8])
return fn[5:15].replace('.', '/')
def formatRegion(r):
"""Format lat/lon region specifier as string suitable for file name."""
if isinstance(r, str):
return r
else:
strs = [str(i).replace('-', 'm') for i in r]
return 'region-%s-%sby%s-%s' % tuple(strs)
def formatGrid(r):
"""Format lat/lon grid resolution specifier as string suitable for file name."""
if isinstance(r, str):
return r
else:
return str(r[0]) + 'by' + str(r[1])
def main(args):
nEpochs = int(args[0])
nWindow = int(args[1])
nNeighbors = int(args[2])
averager = args[3]
optimization = args[4]
mode = args[5]
nWorkers = int(args[6])
urlFile = args[7]
urls = [s.strip() for s in open(urlFile, 'r')]
if averager == 'gaussInterp':
results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
averager=averager, optimization=optimization, averagingConfig=GaussInterpConfig,
mode=mode, nWorkers=nWorkers)
elif averager == 'spatialFilter':
results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
averager=averager, optimization=optimization, averagingConfig=SpatialFilterConfig1,
mode=mode, nWorkers=nWorkers)
elif averager == 'pixelMean':
results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
averager=averager, optimization=optimization, averagingConfig=PixelMeanConfig,
mode=mode, nWorkers=nWorkers)
else:
print('climatology2: Error, averager must be one of', list(AveragingFunctions.keys()), file=sys.stderr)
sys.exit(1)
if results[0][1] is not None:
makeMovie([r[1] for r in results], 'clim.mpg')
return results
if __name__ == '__main__':
print(main(sys.argv[1:]))
# Old Tests:
# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_10days.txt
# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_10days.txt
# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_40days.txt
# python climatology2.py 5 5 0 pixelMean fortran multicore 8 urls_sst_40days.txt
# python climatology2.py 5 5 3 gaussInterp fortran multicore 8 urls_sst_40days.txt
# Old Production:
# python climatology2.py 5 5 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log &
# python climatology2.py 10 10 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log &
# python climatology2.py 5 5 3 gaussInterp fortran multicore 16 urls_sst_2015.txt >& log &
# Tests:
# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_daynight_5days_sorted.txt
# python climatology2.py 5 5 0 pixelMean fortran multicore 4 urls_sst_daynight_5days_sorted.txt
# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_daynight_5days_sorted.txt
# python climatology2.py 5 5 3 gaussInterp fortran multicore 4 urls_sst_daynight_5days_sorted.txt
# python climatology2.py 5 7 1 spatialFilter fortran sequential 1 urls_sst_daynight_5days_sorted.txt
# Test number of neighbors needed:
# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_20days_sorted.txt
# Production:
# python climatology2.py 5 7 0 pixelMean fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt
# python climatology2.py 5 7 3 gaussInterp fortran sequential 1 urls_sst_daynight_2003_2015_sorted.txt
# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt
# python climatology2.py 5 7 1 spatialFilter fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt