blob: f0f9b4383a20e09aca9441eee8f5c8bdb940817b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.datastorage.SeekableInputStream;
import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.datastorage.HDirectory;
import org.apache.pig.backend.hadoop.datastorage.HPath;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.Utils;
public class FileLocalizer {
private static final Log log = LogFactory.getLog(FileLocalizer.class);
static public final String LOCAL_PREFIX = "file:";
static public final int STYLE_UNIX = 0;
static public final int STYLE_WINDOWS = 1;
public static FsPermission OWNER_ONLY_PERMS = new FsPermission(FsAction.ALL, FsAction.NONE,
FsAction.NONE); // rwx------
public static class DataStorageInputStreamIterator extends InputStream {
InputStream current;
ElementDescriptor[] elements;
int currentElement;
public DataStorageInputStreamIterator(ElementDescriptor[] elements) {
this.elements = elements;
private boolean isEOF() throws IOException {
if (current == null) {
if (currentElement == elements.length) {
return true;
current = elements[ currentElement++ ].open();
return false;
private void doNext() throws IOException {
current = null;
public int read() throws IOException {
while (!isEOF()) {
int rc =;
if (rc != -1)
return rc;
return -1;
public int available() throws IOException {
if (isEOF())
return 0;
return current.available();
public void close() throws IOException {
if (current != null) {
current = null;
currentElement = elements.length;
public int read(byte[] b, int off, int len) throws IOException {
int count = 0;
while (!isEOF() && len > 0) {
int rc =, off, len);
if (rc <= 0) {
off += rc;
len -= rc;
count += rc;
return count == 0 ? (isEOF() ? -1 : 0) : count;
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
public long skip(long n) throws IOException {
while (!isEOF() && n > 0) {
n -= current.skip(n);
return n;
static String checkDefaultPrefix(ExecType execType, String fileSpec) {
if (fileSpec.startsWith(LOCAL_PREFIX))
return fileSpec;
return (execType == ExecType.LOCAL ? LOCAL_PREFIX : "") + fileSpec;
* This function is meant to be used if the mappers/reducers want to access any HDFS file
* @param fileName
* @return InputStream of the open file.
* @throws IOException
public static InputStream openDFSFile(String fileName) throws IOException {
Configuration conf = PigMapReduce.sJobConfInternal.get();
if (conf == null) {
throw new RuntimeException(
"can't open DFS file while executing locally");
return openDFSFile(fileName, ConfigurationUtil.toProperties(conf));
public static InputStream openDFSFile(String fileName, Properties properties) throws IOException{
DataStorage dds = new HDataStorage(properties);
ElementDescriptor elem = dds.asElement(fileName);
return openDFSFile(elem);
public static long getSize(String fileName) throws IOException {
Configuration conf = PigMapReduce.sJobConfInternal.get();
if (conf == null) {
throw new RuntimeException(
"can't open DFS file while executing locally");
return getSize(fileName, ConfigurationUtil.toProperties(conf));
public static long getSize(String fileName, Properties properties) throws IOException {
DataStorage dds = new HDataStorage(properties);
ElementDescriptor elem = dds.asElement(fileName);
// recursively get all the files under this path
ElementDescriptor[] allElems = getFileElementDescriptors(elem);
long size = 0;
// add up the sizes of all files found
for (int i=0; i<allElems.length; i++) {
Map<String, Object> stats = allElems[i].getStatistics();
size += (Long) (stats.get(ElementDescriptor.LENGTH_KEY));
return size;
private static InputStream openDFSFile(ElementDescriptor elem) throws IOException{
ElementDescriptor[] elements = null;
if (elem.exists()) {
try {
if(! elem.getDataStorage().isContainer(elem.toString())) {
if (elem.systemElement())
throw new IOException ("Attempt is made to open system file " + elem.toString());
catch (DataStorageException e) {
throw new IOException("Failed to determine if elem=" + elem + " is container", e);
// elem is a directory - recursively get all files in it
elements = getFileElementDescriptors(elem);
} else {
// It might be a glob
if (!globMatchesFiles(elem, elem.getDataStorage())) {
throw new IOException(elem.toString() + " does not exist");
} else {
elements = getFileElementDescriptors(elem);
return new DataStorageInputStreamIterator(elements);
return new DataStorageInputStreamIterator(elements);
* recursively get all "File" element descriptors present in the input element descriptor
* @param elem input element descriptor
* @return an array of Element descriptors for files present (found by traversing all levels of dirs)
* in the input element descriptor
* @throws DataStorageException
private static ElementDescriptor[] getFileElementDescriptors(ElementDescriptor elem) throws DataStorageException {
DataStorage store = elem.getDataStorage();
ElementDescriptor[] elems = store.asCollection(elem.toString());
// elems could have directories in it, if so
// get the files out so that it contains only files
List<ElementDescriptor> paths = new ArrayList<ElementDescriptor>();
List<ElementDescriptor> filePaths = new ArrayList<ElementDescriptor>();
for (int m = 0; m < elems.length; m++) {
for (int j = 0; j < paths.size(); j++) {
ElementDescriptor fullPath = store.asElement(store
.getActiveContainer(), paths.get(j));
// Skip hadoop's private/meta files ...
if (fullPath.systemElement()) {
if (fullPath instanceof ContainerDescriptor) {
for (ElementDescriptor child : ((ContainerDescriptor) fullPath)) {
} else {
// this is a file, add it to filePaths
elems = new ElementDescriptor[filePaths.size()];
return elems;
private static InputStream openLFSFile(ElementDescriptor elem) throws IOException{
// IMPORTANT NOTE: Currently we use HXXX classes to represent
// files and dirs in local mode - so we can just delegate this
// call to openDFSFile(elem). When we have true local mode files
return openDFSFile(elem);
* This function returns an input stream to a local file system file or
* a file residing on Hadoop's DFS
* @param fileName The filename to open
* @param execType execType indicating whether executing in local mode or MapReduce mode (Hadoop)
* @param storage The DataStorage object used to open the fileSpec
* @return InputStream to the fileSpec
* @throws IOException
* @deprecated Use {@link #open(String, PigContext)} instead
static public InputStream open(String fileName, ExecType execType, DataStorage storage) throws IOException {
fileName = checkDefaultPrefix(execType, fileName);
if (!fileName.startsWith(LOCAL_PREFIX)) {
ElementDescriptor elem = storage.asElement(fullPath(fileName, storage));
return openDFSFile(elem);
else {
fileName = fileName.substring(LOCAL_PREFIX.length());
ElementDescriptor elem = storage.asElement(fullPath(fileName, storage));
return openLFSFile(elem);
* @deprecated Use {@link #fullPath(String, PigContext)} instead
public static String fullPath(String fileName, DataStorage storage) {
String fullPath;
try {
if (fileName.charAt(0) != '/') {
ElementDescriptor currentDir = storage.getActiveContainer();
ElementDescriptor elem = storage.asElement(currentDir.toString(), fileName);
fullPath = elem.toString();
} else {
fullPath = fileName;
} catch (DataStorageException e) {
fullPath = fileName;
return fullPath;
static public InputStream open(String fileSpec, PigContext pigContext) throws IOException {
fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
if (!fileSpec.startsWith(LOCAL_PREFIX)) {
ElementDescriptor elem = pigContext.getDfs().asElement(fullPath(fileSpec, pigContext));
return openDFSFile(elem);
else {
fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
//buffering because we only want buffered streams to be passed to load functions.
/*return new BufferedInputStream(new FileInputStream(fileSpec));*/
ElementDescriptor elem = pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));
return openLFSFile(elem);
* @param fileSpec
* @param offset
* @param pigContext
* @return SeekableInputStream
* @throws IOException
* This is an overloaded version of open where there is a need to seek in stream. Currently seek is supported
* only in file, not in directory or glob.
static public SeekableInputStream open(String fileSpec, long offset, PigContext pigContext) throws IOException {
fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
ElementDescriptor elem;
if (!fileSpec.startsWith(LOCAL_PREFIX))
elem = pigContext.getDfs().asElement(fullPath(fileSpec, pigContext));
fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
elem = pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));
if (elem.exists() && (!elem.getDataStorage().isContainer(elem.toString()))) {
try {
if (elem.systemElement())
throw new IOException ("Attempt is made to open system file " + elem.toString());
SeekableInputStream sis = elem.sopen();, FLAGS.SEEK_SET);
return sis;
catch (DataStorageException e) {
throw new IOException("Failed to determine if elem=" + elem + " is container", e);
// Either a directory or a glob.
throw new IOException("Currently seek is supported only in a file, not in glob or directory.");
static public OutputStream create(String fileSpec, PigContext pigContext) throws IOException{
return create(fileSpec,false,pigContext);
static public OutputStream create(String fileSpec, boolean append, PigContext pigContext) throws IOException {
fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
if (!fileSpec.startsWith(LOCAL_PREFIX)) {
ElementDescriptor elem = pigContext.getDfs().asElement(fileSpec);
return elem.create();
else {
fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
// TODO probably this should be replaced with the local file system
File f = (new File(fileSpec)).getParentFile();
if (f!=null){
boolean res = f.mkdirs();
if (!res)
log.warn("FileLocalizer.create: failed to create " + f);
return new FileOutputStream(fileSpec,append);
static public boolean delete(String fileSpec, PigContext pigContext) throws IOException{
fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
ElementDescriptor elem = null;
if (!fileSpec.startsWith(LOCAL_PREFIX)) {
elem = pigContext.getDfs().asElement(fileSpec);
} else {
elem = pigContext.getLfs().asElement(fileSpec);
return true;
static Random r = new Random();
* Thread local relativeRoot ContainerDescriptor. Do not access this object
* directly, since it's lazy initialized in the relativeRoot(PigContext)
* method, which should be used instead.
private static ThreadLocal<ContainerDescriptor> relativeRoot =
new ThreadLocal<ContainerDescriptor>() {
private static ContainerDescriptor resourcePath;
* This method is only used by test code to reset state.
* @param initialized
public static void setInitialized(boolean initialized) {
if (!initialized) {
* Accessor method to get the root ContainerDescriptor used for temporary
* files bound to this thread. Calling this method lazy-initialized the
* relativeRoot object.
* @param pigContext
* @return
* @throws DataStorageException
private static synchronized ContainerDescriptor relativeRoot(final PigContext pigContext)
throws DataStorageException {
if (relativeRoot.get() == null) {
ContainerDescriptor relative = getTempContainer(pigContext);
return relativeRoot.get();
* Accessor method to get the resource ContainerDescriptor used for tez resource
* path bound to this thread. Calling this method lazy-initialized the
* resourcePath object. This path is different than relativeRoot in that
* calling PigServer.shutdown will only remove relativeRoot but not resourthPath
* since resourthPath should be available in the entire session
* @param pigContext
* @return temporary resource path
* @throws DataStorageException
public static synchronized Path getTemporaryResourcePath(final PigContext pigContext)
throws DataStorageException {
if (resourcePath == null) {
resourcePath = getTempContainer(pigContext);
return ((HPath)resourcePath).getPath();
private static synchronized ContainerDescriptor getTempContainer(final PigContext pigContext)
throws DataStorageException {
ContainerDescriptor tempContainer = null;
String tdir= Utils.substituteVars(pigContext.getProperties().getProperty(PigConfiguration.PIG_TEMP_DIR, "/tmp"));
try {
do {
tempContainer = pigContext.getDfs().asContainer(tdir + "/temp" + r.nextInt());
} while (tempContainer.exists());
catch (IOException e) {
// try one last time in case this was due IO Exception caused by dir
// operations on directory created by another JVM at the same instant
tempContainer = pigContext.getDfs().asContainer(tdir + "/temp" + r.nextInt());
try {
catch (IOException e1) {
throw new DataStorageException(e1);
return tempContainer;
private static void createContainer(ContainerDescriptor container) throws IOException {
if (container instanceof HDirectory) {
((HDirectory) container).setPermission(OWNER_ONLY_PERMS);
public static void deleteTempFiles() {
if (relativeRoot.get() != null) {
try {
} catch (IOException e) {
public static void deleteTempResourceFiles() {
if (resourcePath != null) {
try {
} catch (IOException e) {
public static Path getTemporaryPath(PigContext pigContext) throws IOException {
return getTemporaryPath(pigContext, "");
public static Path getTemporaryPath(PigContext pigContext, String suffix) throws IOException {
ElementDescriptor relative = relativeRoot(pigContext);
ElementDescriptor elem=
pigContext.getDfs().asElement(relative.toString(), "tmp" + r.nextInt() + suffix);
return ((HPath)elem).getPath();
public static String hadoopify(String filename, PigContext pigContext) throws IOException {
if (Shell.WINDOWS){
filename = filename.replace('\\','/');
if (filename.startsWith(LOCAL_PREFIX)) {
filename = filename.substring(LOCAL_PREFIX.length());
ElementDescriptor localElem =
if (!localElem.exists()) {
throw new FileNotFoundException(filename);
ElementDescriptor distribElem = pigContext.getDfs().asElement(
int suffixStart = filename.lastIndexOf('.');
if (suffixStart != -1) {
distribElem = pigContext.getDfs().asElement(distribElem.toString() +
// TODO: currently the copy method in Data Storage does not allow to specify overwrite
// so the work around is to delete the dst file first, if it exists
if (distribElem.exists()) {
localElem.copy(distribElem, null, false);
return distribElem.toString();
public static String fullPath(String filename, PigContext pigContext) throws IOException {
try {
if (filename.charAt(0) != '/') {
ElementDescriptor currentDir = pigContext.getDfs().getActiveContainer();
ElementDescriptor elem = pigContext.getDfs().asElement(currentDir.toString(),
return elem.toString();
return filename;
catch (DataStorageException e) {
return filename;
public static boolean fileExists(String filename, PigContext context)
throws IOException {
return fileExists(filename, context.getFs());
* @deprecated Use {@link #fileExists(String, PigContext)} instead
public static boolean fileExists(String filename, DataStorage store)
throws IOException {
ElementDescriptor elem = store.asElement(filename);
return elem.exists() || globMatchesFiles(elem, store);
public static boolean isFile(String filename, PigContext context)
throws IOException {
return !isDirectory(filename, context.getDfs());
* @deprecated Use {@link #isFile(String, PigContext)} instead
public static boolean isFile(String filename, DataStorage store)
throws IOException {
return !isDirectory(filename, store);
public static boolean isDirectory(String filename, PigContext context)
throws IOException {
return isDirectory(filename, context.getDfs());
* @deprecated Use {@link #isDirectory(String, PigContext)} instead.
public static boolean isDirectory(String filename, DataStorage store)
throws IOException {
ElementDescriptor elem = store.asElement(filename);
return (elem instanceof ContainerDescriptor);
private static boolean globMatchesFiles(ElementDescriptor elem,
DataStorage fs)
throws IOException
try {
// Currently, if you give a glob with non-special glob characters, hadoop
// returns an array with your file name in it. So check for that.
ElementDescriptor[] elems = fs.asCollection(elem.toString());
switch (elems.length) {
case 0:
return false;
case 1:
return !elems[0].equals(elem);
return true;
catch (DataStorageException e) {
throw e;
public static void setR(Random r) {
FileLocalizer.r = r;
* Convert path from Windows convention to Unix convention. Invoked under
* cygwin.
* @param path
* path in Windows convention
* @return path in Unix convention, null if fail
static public String parseCygPath(String path, int style) {
String[] command;
if (style==STYLE_WINDOWS)
command = new String[] { "cygpath", "-w", path };
command = new String[] { "cygpath", "-u", path };
Process p = null;
try {
p = Runtime.getRuntime().exec(command);
} catch (IOException e) {
return null;
int exitVal = 0;
try {
exitVal = p.waitFor();
} catch (InterruptedException e) {
return null;
if (exitVal != 0)
return null;
String line = null;
BufferedReader br = null;
try {
InputStreamReader isr = new InputStreamReader(p.getInputStream());
br = new BufferedReader(isr);
line = br.readLine();
} catch (IOException e) {
return null;
} finally {
if (br != null) try {br.close();} catch (Exception e) {}
return line;
static File localTempDir = null;
static {
File f;
boolean success = true;
try {
f = File.createTempFile("pig", "tmp");
success &= f.delete();
success &= f.mkdir();
localTempDir = f;
} catch (IOException e) {
if (!success) {
throw new RuntimeException("Error creating FileLocalizer temp directory.");
public static class FetchFileRet {
public FetchFileRet(File file, boolean didFetch) {
this.file = file;
this.didFetch = didFetch;
public File file;
public boolean didFetch;
* Ensures that the passed path is on the local file system, fetching it
* to the if necessary. If is true
* and dfs is not null, then a relative path is assumed to be relative to the passed
* dfs active directory. Else they are assumed to be relative to the local working
* directory.
public static FetchFileRet fetchFile(Properties properties, String filePath) throws IOException {
return fetchFilesInternal(properties, filePath, false)[0];
* Ensures that the passed files pointed to by path are on the local file system,
* fetching them to the if necessary. If is true
* and dfs is not null, then a relative path is assumed to be relative to the passed
* dfs active directory. Else they are assumed to be relative to the local working
* directory.
public static FetchFileRet[] fetchFiles(Properties properties, String filePath) throws IOException {
return fetchFilesInternal(properties, filePath, true);
* Copies the files from remote to local filesystem.
* When 'multipleFiles' is set the path could point to multiple files
* through globs or a directory. In this case, return array contains multiple
* files, otherwise a single file is returned.
* If is true then a relative path is assumed to be
* relative to the default filesystem's active directory.
* Else they are assumed to be relative to the local working directory.
* @param properties
* @param filePath
* @param multipleFiles
* @return
private static FetchFileRet[] fetchFilesInternal(Properties properties,
String filePath,
boolean multipleFiles) throws IOException {
Path path = new Path(filePath);
if (path.getName().isEmpty()) {
return new FetchFileRet[0];
URI uri = path.toUri();
Configuration conf = new Configuration();
ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(properties));
// if there is no schema or if the schema is "local", then it is
// expected to be a local path.
FileSystem localFs = FileSystem.getLocal(conf);
FileSystem srcFs;
if ( (!"true".equals(properties.getProperty(""))
&& uri.getScheme() == null )||
// For Windows local files
(uri.getScheme() == null && uri.getPath().matches("^/[A-Za-z]:.*")) ||
(uri.getScheme() != null && uri.getScheme().equals("local"))
) {
srcFs = localFs;
} else {
srcFs = path.getFileSystem(conf);
FileStatus[] files;
if (multipleFiles) {
files = srcFs.globStatus(path);
} else {
files = new FileStatus[]{ srcFs.getFileStatus(path) };
if (files == null || files.length == 0) {
throw new ExecException("file '" + filePath + "' does not exist.", 101, PigException.INPUT);
FetchFileRet[] fetchFiles = new FetchFileRet[files.length];
int idx = 0;
for(FileStatus file : files) {
// should throw an exception if this is not a file?
String pathname = file.getPath().toUri().getPath();
String filename = file.getPath().getName();
if (srcFs == localFs) {
fetchFiles[idx++] = new FetchFileRet(new File(pathname), false);
} else {
// fetch from remote:
File dest = new File(localTempDir, filename);
try {
srcFs.copyToLocalFile(file.getPath(), new Path(dest.getAbsolutePath()));
} catch (IOException e) {
throw new ExecException("Could not copy " + filePath + " to local destination " + dest, 101, PigException.INPUT, e);
fetchFiles[idx++] = new FetchFileRet(dest, true);
return fetchFiles;
* Ensures that the passed resource is available from the local file system, fetching
* it to a temporary directory.
* @throws ResourceNotFoundException
public static FetchFileRet fetchResource(String name) throws IOException, ResourceNotFoundException {
FetchFileRet localFileRet = null;
InputStream resourceStream = PigContext.getClassLoader().getResourceAsStream(name);
if (resourceStream != null) {
File dest = new File(localTempDir, name);
OutputStream outputStream = null;
try {
outputStream = new BufferedOutputStream(new FileOutputStream(dest));
byte[] buffer = new byte[1024];
int len;
while (( > 0) {
} finally {
if (outputStream != null) {
localFileRet = new FetchFileRet(dest,false);
throw new ResourceNotFoundException(name);
return localFileRet;