| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.DataOutput; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.net.URI; |
| import java.nio.ByteBuffer; |
| import java.util.EnumSet; |
| import java.util.StringTokenizer; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /**************************************************************** |
| * Implement the FileSystem API for the raw local filesystem. |
| * |
| *****************************************************************/ |
| @InterfaceAudience.Public |
| @InterfaceStability.Stable |
| public class RawLocalFileSystem extends FileSystem { |
| static final URI NAME = URI.create("file:///"); |
| private Path workingDir; |
| |
| public RawLocalFileSystem() { |
| workingDir = getInitialWorkingDirectory(); |
| } |
| |
| private Path makeAbsolute(Path f) { |
| if (f.isAbsolute()) { |
| return f; |
| } else { |
| return new Path(workingDir, f); |
| } |
| } |
| |
| /** Convert a path to a File. */ |
| public File pathToFile(Path path) { |
| checkPath(path); |
| if (!path.isAbsolute()) { |
| path = new Path(getWorkingDirectory(), path); |
| } |
| return new File(path.toUri().getPath()); |
| } |
| |
| public URI getUri() { return NAME; } |
| |
| public void initialize(URI uri, Configuration conf) throws IOException { |
| super.initialize(uri, conf); |
| setConf(conf); |
| } |
| |
| class TrackingFileInputStream extends FileInputStream { |
| public TrackingFileInputStream(File f) throws IOException { |
| super(f); |
| } |
| |
| public int read() throws IOException { |
| int result = super.read(); |
| if (result != -1) { |
| statistics.incrementBytesRead(1); |
| } |
| return result; |
| } |
| |
| public int read(byte[] data) throws IOException { |
| int result = super.read(data); |
| if (result != -1) { |
| statistics.incrementBytesRead(result); |
| } |
| return result; |
| } |
| |
| public int read(byte[] data, int offset, int length) throws IOException { |
| int result = super.read(data, offset, length); |
| if (result != -1) { |
| statistics.incrementBytesRead(result); |
| } |
| return result; |
| } |
| } |
| |
| /******************************************************* |
| * For open()'s FSInputStream. |
| *******************************************************/ |
| class LocalFSFileInputStream extends FSInputStream { |
| private FileInputStream fis; |
| private long position; |
| |
| public LocalFSFileInputStream(Path f) throws IOException { |
| this.fis = new TrackingFileInputStream(pathToFile(f)); |
| } |
| |
| public void seek(long pos) throws IOException { |
| fis.getChannel().position(pos); |
| this.position = pos; |
| } |
| |
| public long getPos() throws IOException { |
| return this.position; |
| } |
| |
| public boolean seekToNewSource(long targetPos) throws IOException { |
| return false; |
| } |
| |
| /* |
| * Just forward to the fis |
| */ |
| public int available() throws IOException { return fis.available(); } |
| public void close() throws IOException { fis.close(); } |
| @Override |
| public boolean markSupported() { return false; } |
| |
| public int read() throws IOException { |
| try { |
| int value = fis.read(); |
| if (value >= 0) { |
| this.position++; |
| } |
| return value; |
| } catch (IOException e) { // unexpected exception |
| throw new FSError(e); // assume native fs error |
| } |
| } |
| |
| public int read(byte[] b, int off, int len) throws IOException { |
| try { |
| int value = fis.read(b, off, len); |
| if (value > 0) { |
| this.position += value; |
| } |
| return value; |
| } catch (IOException e) { // unexpected exception |
| throw new FSError(e); // assume native fs error |
| } |
| } |
| |
| public int read(long position, byte[] b, int off, int len) |
| throws IOException { |
| ByteBuffer bb = ByteBuffer.wrap(b, off, len); |
| try { |
| return fis.getChannel().read(bb, position); |
| } catch (IOException e) { |
| throw new FSError(e); |
| } |
| } |
| |
| public long skip(long n) throws IOException { |
| long value = fis.skip(n); |
| if (value > 0) { |
| this.position += value; |
| } |
| return value; |
| } |
| } |
| |
| public FSDataInputStream open(Path f, int bufferSize) throws IOException { |
| if (!exists(f)) { |
| throw new FileNotFoundException(f.toString()); |
| } |
| return new FSDataInputStream(new BufferedFSInputStream( |
| new LocalFSFileInputStream(f), bufferSize)); |
| } |
| |
| /********************************************************* |
| * For create()'s FSOutputStream. |
| *********************************************************/ |
| class LocalFSFileOutputStream extends OutputStream { |
| private FileOutputStream fos; |
| |
| private LocalFSFileOutputStream(Path f, boolean append) throws IOException { |
| this.fos = new FileOutputStream(pathToFile(f), append); |
| } |
| |
| /* |
| * Just forward to the fos |
| */ |
| public void close() throws IOException { fos.close(); } |
| public void flush() throws IOException { fos.flush(); } |
| public void write(byte[] b, int off, int len) throws IOException { |
| try { |
| fos.write(b, off, len); |
| } catch (IOException e) { // unexpected exception |
| throw new FSError(e); // assume native fs error |
| } |
| } |
| |
| public void write(int b) throws IOException { |
| try { |
| fos.write(b); |
| } catch (IOException e) { // unexpected exception |
| throw new FSError(e); // assume native fs error |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| public FSDataOutputStream append(Path f, int bufferSize, |
| Progressable progress) throws IOException { |
| if (!exists(f)) { |
| throw new FileNotFoundException("File " + f + " not found."); |
| } |
| if (getFileStatus(f).isDirectory()) { |
| throw new IOException("Cannot append to a diretory (=" + f + " )."); |
| } |
| return new FSDataOutputStream(new BufferedOutputStream( |
| new LocalFSFileOutputStream(f, true), bufferSize), statistics); |
| } |
| |
| /** {@inheritDoc} */ |
| public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, |
| short replication, long blockSize, Progressable progress) |
| throws IOException { |
| if (exists(f) && !overwrite) { |
| throw new IOException("File already exists:"+f); |
| } |
| Path parent = f.getParent(); |
| if (parent != null && !mkdirs(parent)) { |
| throw new IOException("Mkdirs failed to create " + parent.toString()); |
| } |
| return new FSDataOutputStream(new BufferedOutputStream( |
| new LocalFSFileOutputStream(f, false), bufferSize), statistics); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public FSDataOutputStream create(Path f, FsPermission permission, |
| boolean overwrite, int bufferSize, short replication, long blockSize, |
| Progressable progress) throws IOException { |
| |
| FSDataOutputStream out = create(f, |
| overwrite, bufferSize, replication, blockSize, progress); |
| setPermission(f, permission); |
| return out; |
| } |
| |
| |
| @Override |
| protected FSDataOutputStream primitiveCreate(Path f, |
| FsPermission absolutePermission, EnumSet<CreateFlag> flag, |
| int bufferSize, short replication, long blockSize, Progressable progress, |
| int bytesPerChecksum) throws IOException { |
| |
| if(flag.contains(CreateFlag.APPEND)){ |
| if (!exists(f)){ |
| if(flag.contains(CreateFlag.CREATE)) { |
| return create(f, false, bufferSize, replication, blockSize, null); |
| } |
| } |
| return append(f, bufferSize, null); |
| } |
| |
| FSDataOutputStream out = create(f, flag.contains(CreateFlag.OVERWRITE), |
| bufferSize, replication, blockSize, progress); |
| setPermission(f, absolutePermission); |
| return out; |
| } |
| |
| public boolean rename(Path src, Path dst) throws IOException { |
| if (pathToFile(src).renameTo(pathToFile(dst))) { |
| return true; |
| } |
| return FileUtil.copy(this, src, this, dst, true, getConf()); |
| } |
| |
| public boolean delete(Path p, boolean recursive) throws IOException { |
| File f = pathToFile(p); |
| if (f.isFile()) { |
| return f.delete(); |
| } else if ((!recursive) && f.isDirectory() && |
| (f.listFiles().length != 0)) { |
| throw new IOException("Directory " + f.toString() + " is not empty"); |
| } |
| return FileUtil.fullyDelete(f); |
| } |
| |
| public FileStatus[] listStatus(Path f) throws IOException { |
| File localf = pathToFile(f); |
| FileStatus[] results; |
| |
| if (!localf.exists()) { |
| throw new FileNotFoundException("File " + f + " does not exist."); |
| } |
| if (localf.isFile()) { |
| return new FileStatus[] { |
| new RawLocalFileStatus(localf, getDefaultBlockSize(), this) }; |
| } |
| |
| String[] names = localf.list(); |
| if (names == null) { |
| return null; |
| } |
| results = new FileStatus[names.length]; |
| for (int i = 0; i < names.length; i++) { |
| results[i] = getFileStatus(new Path(f, names[i])); |
| } |
| return results; |
| } |
| |
| /** |
| * Creates the specified directory hierarchy. Does not |
| * treat existence as an error. |
| */ |
| public boolean mkdirs(Path f) throws IOException { |
| if(f == null) { |
| throw new IllegalArgumentException("mkdirs path arg is null"); |
| } |
| Path parent = f.getParent(); |
| File p2f = pathToFile(f); |
| if(parent != null) { |
| File parent2f = pathToFile(parent); |
| if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) { |
| throw new FileAlreadyExistsException("Parent path is not a directory: " |
| + parent); |
| } |
| } |
| return (parent == null || mkdirs(parent)) && |
| (p2f.mkdir() || p2f.isDirectory()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public boolean mkdirs(Path f, FsPermission permission) throws IOException { |
| boolean b = mkdirs(f); |
| if(b) { |
| setPermission(f, permission); |
| } |
| return b; |
| } |
| |
| |
| @Override |
| protected boolean primitiveMkdir(Path f, FsPermission absolutePermission) |
| throws IOException { |
| boolean b = mkdirs(f); |
| setPermission(f, absolutePermission); |
| return b; |
| } |
| |
| |
| @Override |
| public Path getHomeDirectory() { |
| return this.makeQualified(new Path(System.getProperty("user.home"))); |
| } |
| |
| /** |
| * Set the working directory to the given directory. |
| */ |
| @Override |
| public void setWorkingDirectory(Path newDir) { |
| workingDir = makeAbsolute(newDir); |
| checkPath(workingDir); |
| |
| } |
| |
| @Override |
| public Path getWorkingDirectory() { |
| return workingDir; |
| } |
| |
| @Override |
| protected Path getInitialWorkingDirectory() { |
| return this.makeQualified(new Path(System.getProperty("user.dir"))); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public FsStatus getStatus(Path p) throws IOException { |
| File partition = pathToFile(p == null ? new Path("/") : p); |
| //File provides getUsableSpace() and getFreeSpace() |
| //File provides no API to obtain used space, assume used = total - free |
| return new FsStatus(partition.getTotalSpace(), |
| partition.getTotalSpace() - partition.getFreeSpace(), |
| partition.getFreeSpace()); |
| } |
| |
| // In the case of the local filesystem, we can just rename the file. |
| public void moveFromLocalFile(Path src, Path dst) throws IOException { |
| rename(src, dst); |
| } |
| |
| // We can write output directly to the final location |
| public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) |
| throws IOException { |
| return fsOutputFile; |
| } |
| |
| // It's in the right place - nothing to do. |
| public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile) |
| throws IOException { |
| } |
| |
| public void close() throws IOException { |
| super.close(); |
| } |
| |
| public String toString() { |
| return "LocalFS"; |
| } |
| |
| public FileStatus getFileStatus(Path f) throws IOException { |
| File path = pathToFile(f); |
| if (path.exists()) { |
| return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this); |
| } else { |
| throw new FileNotFoundException("File " + f + " does not exist."); |
| } |
| } |
| |
| static class RawLocalFileStatus extends FileStatus { |
| /* We can add extra fields here. It breaks at least CopyFiles.FilePair(). |
| * We recognize if the information is already loaded by check if |
| * onwer.equals(""). |
| */ |
| private boolean isPermissionLoaded() { |
| return !super.getOwner().equals(""); |
| } |
| |
| RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) { |
| super(f.length(), f.isDirectory(), 1, defaultBlockSize, |
| f.lastModified(), fs.makeQualified(new Path(f.getPath()))); |
| } |
| |
| @Override |
| public FsPermission getPermission() { |
| if (!isPermissionLoaded()) { |
| loadPermissionInfo(); |
| } |
| return super.getPermission(); |
| } |
| |
| @Override |
| public String getOwner() { |
| if (!isPermissionLoaded()) { |
| loadPermissionInfo(); |
| } |
| return super.getOwner(); |
| } |
| |
| @Override |
| public String getGroup() { |
| if (!isPermissionLoaded()) { |
| loadPermissionInfo(); |
| } |
| return super.getGroup(); |
| } |
| |
| /// loads permissions, owner, and group from `ls -ld` |
| private void loadPermissionInfo() { |
| IOException e = null; |
| try { |
| StringTokenizer t = new StringTokenizer( |
| execCommand(new File(getPath().toUri()), |
| Shell.getGET_PERMISSION_COMMAND())); |
| //expected format |
| //-rw------- 1 username groupname ... |
| String permission = t.nextToken(); |
| if (permission.length() > 10) { //files with ACLs might have a '+' |
| permission = permission.substring(0, 10); |
| } |
| setPermission(FsPermission.valueOf(permission)); |
| t.nextToken(); |
| setOwner(t.nextToken()); |
| setGroup(t.nextToken()); |
| } catch (Shell.ExitCodeException ioe) { |
| if (ioe.getExitCode() != 1) { |
| e = ioe; |
| } else { |
| setPermission(null); |
| setOwner(null); |
| setGroup(null); |
| } |
| } catch (IOException ioe) { |
| e = ioe; |
| } finally { |
| if (e != null) { |
| throw new RuntimeException("Error while running command to get " + |
| "file permissions : " + |
| StringUtils.stringifyException(e)); |
| } |
| } |
| } |
| |
| @Override |
| public void write(DataOutput out) throws IOException { |
| if (!isPermissionLoaded()) { |
| loadPermissionInfo(); |
| } |
| super.write(out); |
| } |
| } |
| |
| /** |
| * Use the command chown to set owner. |
| */ |
| @Override |
| public void setOwner(Path p, String username, String groupname) |
| throws IOException { |
| if (username == null && groupname == null) { |
| throw new IOException("username == null && groupname == null"); |
| } |
| |
| if (username == null) { |
| execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); |
| } else { |
| //OWNER[:[GROUP]] |
| String s = username + (groupname == null? "": ":" + groupname); |
| execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s); |
| } |
| } |
| |
| /** |
| * Use the command chmod to set permission. |
| */ |
| @Override |
| public void setPermission(Path p, FsPermission permission) |
| throws IOException { |
| execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND, |
| String.format("%05o", permission.toShort())); |
| } |
| |
| private static String execCommand(File f, String... cmd) throws IOException { |
| String[] args = new String[cmd.length + 1]; |
| System.arraycopy(cmd, 0, args, 0, cmd.length); |
| args[cmd.length] = f.getCanonicalPath(); |
| String output = Shell.execCommand(args); |
| return output; |
| } |
| |
| } |