Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
Simple code to be run on Spark cluster, or using multi-core parallelism on single machine.
import sys, os, calendar, urlparse, urllib
from datetime import datetime
import numpy as N
from variables import getVariables, close
#from timePartitions import partitionFilesByKey
from split import fixedSplit
#from stats import Stats
from pathos.multiprocessing import ProcessingPool as Pool
from plotlib import imageMap, makeMovie
#from gaussInterp import gaussInterp
# Possible execution modes
# Multicore & cluster modes use pathos; Spark mode uses PySpark cluster.
ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark']
# SST L3m 4.6km Metadata
# SST calues are scaled integers in degrees Celsius, lat/lon is 4320 x 8640
# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lat', 'lon']
# Generate algorithmic name for N-day Climatology product
SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)' #??
# Simple mask and average functions to get us started, then add gaussian interpolation.
# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage
def qcMask(var, mask): return,
#def qcMask(var, mask): return != 0, var)
def average(a): return, axis=0)
#AveragingFunctions = {'pixelAverage': average, 'gaussInterp': gaussInterp}
AveragingFunctions = {'pixelAverage': average, 'gaussInterp': average}
def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year)
nEpochs, # compute a climatology for every N epochs (days) by 'averaging'
nWindow, # number of epochs in window needed for averaging
variable, # name of primary variable in file
mask, # name of mask variable
coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
maskFn=qcMask, # mask function to compute mask from mask variable
averager='pixelAverage', # averaging function to use, one of ['pixelAverage', 'gaussInterp']
mode='sequential', # Map across time periods of N-days for concurrent work, executed by:
# 'sequential' map, 'multicore' using, 'cluster' using pathos,
# or 'spark' using PySpark
numNodes=1, # number of cluster nodes to use
nWorkers=4, # number of parallel workers per node
averagingFunctions=AveragingFunctions, # dict of possible averaging functions
legalModes=ExecutionModes # list of possiblel execution modes
'''Compute a climatology every N days by applying a mask and averaging function.
Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
***Assumption: This routine assumes that the N grids will fit in memory.***
averageFn = averagingFunctions[averager]
except :
averageFn = average
print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions)
urlSplits = [s for s in fixedSplit(urls, nEpochs)]
if VERBOSE: print >>sys.stderr, urlSplits
def climsContoured(urls):
n = len(urls)
var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn)
return contourMap(var, variable, coordinates, n, urls[0])
if mode == 'sequential':
plots = map(climsContoured, urlSplits)
elif mode == 'multicore':
pool = Pool(nWorkers)
plots =, urlSplits)
elif mode == 'cluster':
elif mode == 'spark':
plots = map(climsContoured, urlSplits)
print plots
return plots
# return makeMovie(plots, 'clim.mpg')
def climByAveraging(urls, # list of granule URLs for a time period
variable, # name of primary variable in file
mask, # name of mask variable
coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
maskFn=qcMask, # mask function to compute mask from mask variable
averageFn=average # averaging function to use
'''Compute a climatology over N arrays by applying a mask and averaging function.
Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
***Assumption: This routine assumes that the N grids will fit in memory.***
n = len(urls)
varList = [variable, mask]
for i, url in enumerate(urls):
fn = retrieveFile(url, '~/cache')
if VERBOSE: print >>sys.stderr, 'Read variables and mask ...'
var, fh = getVariables(fn, varList) # return dict of variable objects by name
if i == 0:
dtype = var[variable].dtype
shape = (n,) + var[variable].shape
accum =, dtype)
v = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA
# v = var[variable][:]
accum[i] = v # accumulate N arrays for 'averaging'
if i+1 != len(urls): # keep var dictionary from last file to grab metadata
close(fh) # REMEMBER: closing fh loses in-memory data structures
if VERBOSE: print >>sys.stderr, 'Averaging ...'
coord, fh = getVariables(fn, coordinates) # read coordinate arrays and add to dict
for c in coordinates: var[c] = coord[c][:]
if averageFn == average:
avg = averageFn(accum) # call averaging function
var[variable] = accum
if averageFn == gaussInterp:
varNames = variable + coordinates
avg, vweight, status = \
gaussInterp(var, varNames, latGrid, lonGrid, wlat, wlon, slat, slon, stime, vfactor, missingValue)
var['attributes'] = var[variable].__dict__ # save attributes of primary variable
var[variable] = avg # return primary variable & mask arrays in dict
var[mask] =
# close(fh) # Can't close, lose netCDF4.Variable objects, leaking two fh
return var
def contourMap(var, variable, coordinates, n, url):
p = urlparse.urlparse(url)
filename = os.path.split(p.path)[1]
return filename
outFile = filename + '.png'
# Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays
vals = var[variable][:]
# Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing??
imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
vmin=-2., vmax=45., outFile=outFile, autoBorders=False,
title='%s %d-day Mean from %s' % (variable.upper(), n, filename))
print >>sys.stderr, 'Writing contour plot to %s' % outFile
return outFile
def isLocalFile(url):
'''Check if URL is a local path.'''
u = urlparse.urlparse(url)
if u.scheme == '' or u.scheme == 'file':
if not path.exists(u.path):
print >>sys.stderr, 'isLocalFile: File at local path does not exist: %s' % u.path
return (True, u.path)
return (False, u.path)
def retrieveFile(url, dir=None):
'''Retrieve a file from a URL, or if it is a local path then verify it exists.'''
if dir is None: dir = './'
ok, path = isLocalFile(url)
fn = os.path.split(path)[1]
outPath = os.path.join(dir, fn)
if not ok:
if os.path.exists(outPath):
print >>sys.stderr, 'retrieveFile: Using cached file: %s' % outPath
print >>sys.stderr, 'retrieveFile: Retrieving (URL) %s to %s' % (url, outPath)
urllib.urlretrieve(url, outPath)
print >>sys.stderr, 'retrieveFile: Cannot retrieve file at URL: %s' % url
return None
return outPath
def dailyFile2date(path, offset=1):
'''Convert YYYYDOY string in filename to date.'''
fn = os.path.split(path)[1]
year = int(fn[offset:offset+4])
doy = int(fn[offset+5:offset+8])
return fn[5:15].replace('.', '/')
def formatRegion(r):
"""Format lat/lon region specifier as string suitable for file name."""
if isinstance(r, str):
return r
strs = [str(i).replace('-', 'm') for i in r]
return 'region-%s-%sby%s-%s' % tuple(strs)
def formatGrid(r):
"""Format lat/lon grid resolution specifier as string suitable for file name."""
if isinstance(r, str):
return r
return str(r[0]) + 'by' + str(r[1])
def main(args):
nEpochs = int(args[0])
nWindow = int(args[1])
averager = args[2]
mode = args[3]
nWorkers = int(args[4])
urlFile = args[5]
urls = [s.strip() for s in open(urlFile, 'r')]
return climByAveragingPeriods(urls, nEpochs, nWindow, 'sst', 'qual_sst', ['lat', 'lon'],
averager=averager, mode=mode, nWorkers=nWorkers)
if __name__ == '__main__':
print main(sys.argv[1:])
# python 5 5 pixelAverage sequential 1 urls_sst_10days.txt
# python 5 5 gaussianInterp multicore 8 urls_sst_40days.txt