blob: fcaef62daa66cb2829d0a76b6a599bd27689ff36 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
cache.py
Utilities to retrieve files and cache them at deterministic paths.
"""
import sys, os, urllib.parse, urllib.request, urllib.parse, urllib.error
# Directory to cache retrieved files in
CachePath = '/tmp/cache'
def isLocalFile(url):
'''Check if URL is a local path.'''
u = urllib.parse.urlparse(url)
if u.scheme == '' or u.scheme == 'file':
if not os.path.exists(u.path):
print('isLocalFile: File at local path does not exist: %s' % u.path, file=sys.stderr)
return (True, u.path)
else:
return (False, u.path)
def retrieveFile(url, cachePath=CachePath, hdfsPath=None, retries=3):
'''Retrieve a file from a URL or from an HDFS path.'''
if hdfsPath:
hdfsFile = os.path.join(hdfsPath, url)
return hdfsCopyToLocal(hdfsFile, cachePath)
else:
return retrieveFileWeb(url, cachePath, retries)
def retrieveFileWeb(url, cachePath=CachePath, retries=3):
'''Retrieve a file from a URL, or if it is a local path then verify it exists.'''
if cachePath is None: cachePath = './'
ok, path = isLocalFile(url)
if ok: return path
fn = os.path.split(path)[1]
outPath = os.path.join(cachePath, fn)
if os.path.exists(outPath):
print('retrieveFile: Using cached file: %s' % outPath, file=sys.stderr)
return outPath
else:
print('retrieveFile: Retrieving (URL) %s to %s' % (url, outPath), file=sys.stderr)
for i in range(retries):
try:
urllib.request.urlretrieve(url, outPath)
return outPath
except:
print('retrieveFile: Error retrieving file at URL: %s' % url, file=sys.stderr)
print('retrieveFile: Retrying ...', file=sys.stderr)
print('retrieveFile: Fatal error, Cannot retrieve file at URL: %s' % url, file=sys.stderr)
return None
def hdfsCopyFromLocal(src, dest):
'''Copy local file into HDFS directory, overwriting using force switch.'''
outPath = os.path.join(dest, os.path.split(src)[1])
cmd = "hadoop fs -copyFromLocal -f %s %s" % (src, dest)
print("Exec overwrite: %s" % cmd, file=sys.stderr)
os.system(cmd)
return outPath
def hdfsCopyToLocal(src, dest):
'''Copy HDFS file to local path, overwriting.'''
outPath = os.path.join(dest, os.path.split(src)[1])
os.unlink(outPath)
cmd = "hadoop fs -copyToLocal %s %s" % (src, dest)
print("Exec overwrite: %s" % cmd, file=sys.stderr)
os.system(cmd)
return outPath