blob: 321b9057baa4a82917bbe8bacbc8583a60bd22eb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env python
"""
split.py == Some utility functions to split lists of URL's into chunks or time periods.
"""
import sys, os, re, json
import datetime
def fixedSplit(seq, n):
'''Split a sequence into fixed-length chunks of length N. Last chunk is different length.'''
chunk = []
for i, s in enumerate(seq):
chunk.extend(s)
if (i+1) % n == 0:
yield chunk
chunk = []
if len(chunk) > 0: yield chunk
def splitByMonth(seq, timeFromFilename={'get': 'doy', 'regex': re.compile(r'\/A.(....)(...)')}, transformer=None, keyed=True):
'''Split URL's into months using regex to extract information from the filename. Return list or keyed dictionary.'''
if timeFromFilename['get'][1] == 'doy':
transformer = lambda keys: (keys[0], doy2month(*keys))
urlsByMonth = [ku for ku in splitByKeys(seq, timeFromFilename['regex'], transformer, keyed)]
return urlsByMonth
def splitByKeys(seq, regex, transformer=None, keyed=True):
'''Split a sequence into chunks by a key.
The key is extracted from the string by matching a regular expression to the string and returning the matched groups.
'''
regex = re.compile(regex)
chunk = []
for i, s in enumerate(seq):
s = s.strip()
if i == 0:
keys = extractKeys(s, regex, transformer)
keys1 = extractKeys(s, regex, transformer)
if keys1 != keys:
if keyed:
if len(keys) == 1:
try:
intKey = int(keys[0])
yield (intKey, chunk)
except:
yield (keys, chunk)
else:
yield (keys, chunk)
else:
yield chunk
chunk = [s]
keys = keys1
else:
chunk.append(s)
if len(chunk) > 0:
if keyed:
if len(keys) == 1:
try:
intKey = int(keys[0])
yield (intKey, chunk)
except:
yield (keys, chunk)
else:
yield chunk
def extractKeys(s, regex, transformer=None):
'''Extract keys from a string by matching a regular expression to the string and returning the matched groups. Transformer functions alter the keys.'''
regex = re.compile(regex)
mat = regex.search(s)
if not mat:
print >>sys.stderr, 'extractKeys: Fatal error, regex %s does not match %s' % (regex.pattern, s)
sys.exit(1)
else:
keys = mat.groups()
if transformer is not None:
keys = transformer(keys)
return keys
def splitByNDays(seq, n, regex, transformer=None, keyed=True):
'''Split URL's into N-day chunks.'''
daily = [s for s in splitByKeys(seq, regex, transformer, keyed)]
for chunk in fixedSplit(daily, n):
yield chunk
def splitByNDaysKeyed(seq, n, regex, transformer=None, keyed=True):
'''Split URL's into N-day chunks.'''
daily = [s for s in splitByKeys(seq, regex, transformer, keyed)] # url groups keyed by DOY first
for chunk in daily:
keys, chunk = chunk
try:
key = int(keys[0])
except:
key = int(keys)
i = (int((key-1)/n)) * n + 1
yield (i, chunk)
def groupByKeys(seq):
'''Merge multiple keys into a single key by appending lists.'''
seq = [s for s in seq]
merge = {}
for s in seq:
key, chunk = s
if key not in merge:
merge[key] = chunk
else:
merge[key].extend(chunk) # extend returns None, that blows
result = []
for k in sorted(merge.keys()):
result.append((k, merge[k]))
return result
def windowSplit(seq, nEpochs, nWindow):
'''Split a sequence (e.g. of daily files/urls) into nWindow-long chunks for climatology averaging.
The length of the window will usually be longer than the nEpochs the average is good for.
'''
pass
# Tests follow.
def test1(args):
n = int(args[0])
fn = args[1]
with open(fn, 'r') as f:
for chunk in fixedSplit(f, n):
print ' '.join(chunk)
def test2(args):
regex = args[0]
regex = re.compile(regex)
fn = args[1]
with open(fn, 'r') as f:
for chunk in splitByKey(f, regex):
print ' '.join(chunk)
def test3(args):
'''Broken!'''
nDays = int(args[0])
regex = args[1]
regex = re.compile(regex)
fn = args[2]
with open(fn, 'r') as f:
for chunk in splitByNDays(f, nDays, regex):
print chunk
def test4(args):
'''Correct!'''
nDays = int(args[0])
regex = args[1]
regex = re.compile(regex)
fn = args[2]
with open(fn, 'r') as f:
for chunk in splitByNDays(f, nDays, regex):
print
print '\n'.join(chunk)
print len(chunk)
def test5(args):
'''Generate multi-line JSON for pyspark.'''
nDays = int(args[0])
regex = args[1]
fn = args[2]
with open(fn, 'r') as f:
for chunk in splitByNDays(f, nDays, regex):
print json.dumps(chunk)
def test6(args):
'''Generate keyed split by month for spark.'''
regex = args[0]
fn = args[1]
with open(fn, 'r') as f:
for chunk in splitByMonth(f, {'get': 'doy', 'regex': re.compile(regex)}):
print chunk
def main(args):
# test1(args)
# test2(args)
# test3(args)
# test4(args)
# test5(args)
test6(args)
if __name__ == '__main__':
import sys
main(sys.argv[1:])
# python split.py 5 '(...).L3m' urls_sst_daynight_2003_2015_sorted.txt
# python split.py '\/A(....)(...)' urls_sst_daynight_2003_4months.txt
# python split.py '\/A(....)(...)' urls_sst_daynight_2003_2015.txt