| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.fs; |
| |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.CorruptFileBlockIterator; |
| import org.apache.hadoop.hdfs.DFSClient; |
| import org.apache.hadoop.hdfs.DFSUtil; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.protocol.DirectoryListing; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; |
| import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; |
| import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.SecretManager.InvalidToken; |
| import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; |
| import org.apache.hadoop.util.Progressable; |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Evolving |
| public class Hdfs extends AbstractFileSystem { |
| |
| DFSClient dfs; |
| private boolean verifyChecksum = true; |
| |
| static { |
| HdfsConfiguration.init(); |
| } |
| |
| /** |
| * This constructor has the signature needed by |
| * {@link AbstractFileSystem#createFileSystem(URI, Configuration)} |
| * |
| * @param theUri |
| * which must be that of Hdfs |
| * @param conf |
| * @throws IOException |
| */ |
| Hdfs(final URI theUri, final Configuration conf) throws IOException, URISyntaxException { |
| super(theUri, HdfsConstants.HDFS_URI_SCHEME, true, NameNode.DEFAULT_PORT); |
| |
| if (!theUri.getScheme().equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) { |
| throw new IllegalArgumentException("Passed URI's scheme is not for Hdfs"); |
| } |
| String host = theUri.getHost(); |
| if (host == null) { |
| throw new IOException("Incomplete HDFS URI, no host: " + theUri); |
| } |
| |
| InetSocketAddress namenode = NameNode.getAddress(theUri.getAuthority()); |
| this.dfs = new DFSClient(namenode, conf, getStatistics()); |
| } |
| |
| @Override |
| public int getUriDefaultPort() { |
| return NameNode.DEFAULT_PORT; |
| } |
| |
| @Override |
| public FSDataOutputStream createInternal(Path f, |
| EnumSet<CreateFlag> createFlag, FsPermission absolutePermission, |
| int bufferSize, short replication, long blockSize, Progressable progress, |
| int bytesPerChecksum, boolean createParent) throws IOException { |
| return new FSDataOutputStream(dfs.primitiveCreate(getUriPath(f), |
| absolutePermission, createFlag, createParent, replication, blockSize, |
| progress, bufferSize, bytesPerChecksum), getStatistics()); |
| } |
| |
| @Override |
| public boolean delete(Path f, boolean recursive) |
| throws IOException, UnresolvedLinkException { |
| return dfs.delete(getUriPath(f), recursive); |
| } |
| |
| @Override |
| public BlockLocation[] getFileBlockLocations(Path p, long start, long len) |
| throws IOException, UnresolvedLinkException { |
| return dfs.getBlockLocations(getUriPath(p), start, len); |
| } |
| |
| @Override |
| public FileChecksum getFileChecksum(Path f) |
| throws IOException, UnresolvedLinkException { |
| return dfs.getFileChecksum(getUriPath(f)); |
| } |
| |
| @Override |
| public FileStatus getFileStatus(Path f) |
| throws IOException, UnresolvedLinkException { |
| HdfsFileStatus fi = dfs.getFileInfo(getUriPath(f)); |
| if (fi != null) { |
| return makeQualified(fi, f); |
| } else { |
| throw new FileNotFoundException("File does not exist: " + f.toString()); |
| } |
| } |
| |
| @Override |
| public FileStatus getFileLinkStatus(Path f) |
| throws IOException, UnresolvedLinkException { |
| HdfsFileStatus fi = dfs.getFileLinkInfo(getUriPath(f)); |
| if (fi != null) { |
| return makeQualified(fi, f); |
| } else { |
| throw new FileNotFoundException("File does not exist: " + f); |
| } |
| } |
| |
| private FileStatus makeQualified(HdfsFileStatus f, Path parent) { |
| // NB: symlink is made fully-qualified in FileContext. |
| return new FileStatus(f.getLen(), f.isDir(), f.getReplication(), |
| f.getBlockSize(), f.getModificationTime(), |
| f.getAccessTime(), |
| f.getPermission(), f.getOwner(), f.getGroup(), |
| f.isSymlink() ? new Path(f.getSymlink()) : null, |
| (f.getFullPath(parent)).makeQualified( |
| getUri(), null)); // fully-qualify path |
| } |
| |
| private LocatedFileStatus makeQualifiedLocated( |
| HdfsLocatedFileStatus f, Path parent) { |
| return new LocatedFileStatus(f.getLen(), f.isDir(), f.getReplication(), |
| f.getBlockSize(), f.getModificationTime(), |
| f.getAccessTime(), |
| f.getPermission(), f.getOwner(), f.getGroup(), |
| f.isSymlink() ? new Path(f.getSymlink()) : null, |
| (f.getFullPath(parent)).makeQualified( |
| getUri(), null), // fully-qualify path |
| DFSUtil.locatedBlocks2Locations(f.getBlockLocations())); |
| } |
| |
| @Override |
| public FsStatus getFsStatus() throws IOException { |
| return dfs.getDiskStatus(); |
| } |
| |
| @Override |
| public FsServerDefaults getServerDefaults() throws IOException { |
| return dfs.getServerDefaults(); |
| } |
| |
| @Override |
| public RemoteIterator<LocatedFileStatus> listLocatedStatus( |
| final Path p) |
| throws FileNotFoundException, IOException { |
| return new DirListingIterator<LocatedFileStatus>(p, true) { |
| |
| @Override |
| public LocatedFileStatus next() throws IOException { |
| return makeQualifiedLocated((HdfsLocatedFileStatus)getNext(), p); |
| } |
| }; |
| } |
| |
| @Override |
| public RemoteIterator<FileStatus> listStatusIterator(final Path f) |
| throws AccessControlException, FileNotFoundException, |
| UnresolvedLinkException, IOException { |
| return new DirListingIterator<FileStatus>(f, false) { |
| |
| @Override |
| public FileStatus next() throws IOException { |
| return makeQualified(getNext(), f); |
| } |
| }; |
| } |
| |
| /** |
| * This class defines an iterator that returns |
| * the file status of each file/subdirectory of a directory |
| * |
| * if needLocation, status contains block location if it is a file |
| * throws a RuntimeException with the error as its cause. |
| * |
| * @param <T> the type of the file status |
| */ |
| abstract private class DirListingIterator<T extends FileStatus> |
| implements RemoteIterator<T> { |
| private DirectoryListing thisListing; |
| private int i; |
| final private String src; |
| final private boolean needLocation; // if status |
| |
| private DirListingIterator(Path p, boolean needLocation) |
| throws IOException { |
| this.src = Hdfs.this.getUriPath(p); |
| this.needLocation = needLocation; |
| |
| // fetch the first batch of entries in the directory |
| thisListing = dfs.listPaths( |
| src, HdfsFileStatus.EMPTY_NAME, needLocation); |
| if (thisListing == null) { // the directory does not exist |
| throw new FileNotFoundException("File " + src + " does not exist."); |
| } |
| } |
| |
| @Override |
| public boolean hasNext() throws IOException { |
| if (thisListing == null) { |
| return false; |
| } |
| if (i>=thisListing.getPartialListing().length |
| && thisListing.hasMore()) { |
| // current listing is exhausted & fetch a new listing |
| thisListing = dfs.listPaths(src, thisListing.getLastName(), |
| needLocation); |
| if (thisListing == null) { |
| return false; // the directory is deleted |
| } |
| i = 0; |
| } |
| return (i<thisListing.getPartialListing().length); |
| } |
| |
| /** |
| * Get the next item in the list |
| * @return the next item in the list |
| * |
| * @throws IOException if there is any error |
| * @throws NoSuchElmentException if no more entry is available |
| */ |
| public HdfsFileStatus getNext() throws IOException { |
| if (hasNext()) { |
| return thisListing.getPartialListing()[i++]; |
| } |
| throw new NoSuchElementException("No more entry in " + src); |
| } |
| } |
| |
| @Override |
| public FileStatus[] listStatus(Path f) |
| throws IOException, UnresolvedLinkException { |
| String src = getUriPath(f); |
| |
| // fetch the first batch of entries in the directory |
| DirectoryListing thisListing = dfs.listPaths( |
| src, HdfsFileStatus.EMPTY_NAME); |
| |
| if (thisListing == null) { // the directory does not exist |
| throw new FileNotFoundException("File " + f + " does not exist."); |
| } |
| |
| HdfsFileStatus[] partialListing = thisListing.getPartialListing(); |
| if (!thisListing.hasMore()) { // got all entries of the directory |
| FileStatus[] stats = new FileStatus[partialListing.length]; |
| for (int i = 0; i < partialListing.length; i++) { |
| stats[i] = makeQualified(partialListing[i], f); |
| } |
| return stats; |
| } |
| |
| // The directory size is too big that it needs to fetch more |
| // estimate the total number of entries in the directory |
| int totalNumEntries = |
| partialListing.length + thisListing.getRemainingEntries(); |
| ArrayList<FileStatus> listing = |
| new ArrayList<FileStatus>(totalNumEntries); |
| // add the first batch of entries to the array list |
| for (HdfsFileStatus fileStatus : partialListing) { |
| listing.add(makeQualified(fileStatus, f)); |
| } |
| |
| // now fetch more entries |
| do { |
| thisListing = dfs.listPaths(src, thisListing.getLastName()); |
| |
| if (thisListing == null) { |
| // the directory is deleted |
| throw new FileNotFoundException("File " + f + " does not exist."); |
| } |
| |
| partialListing = thisListing.getPartialListing(); |
| for (HdfsFileStatus fileStatus : partialListing) { |
| listing.add(makeQualified(fileStatus, f)); |
| } |
| } while (thisListing.hasMore()); |
| |
| return listing.toArray(new FileStatus[listing.size()]); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public RemoteIterator<Path> listCorruptFileBlocks(Path path) |
| throws IOException { |
| return new CorruptFileBlockIterator(dfs, path); |
| } |
| |
| @Override |
| public void mkdir(Path dir, FsPermission permission, boolean createParent) |
| throws IOException, UnresolvedLinkException { |
| dfs.mkdirs(getUriPath(dir), permission, createParent); |
| } |
| |
| @Override |
| public FSDataInputStream open(Path f, int bufferSize) |
| throws IOException, UnresolvedLinkException { |
| return new DFSClient.DFSDataInputStream(dfs.open(getUriPath(f), |
| bufferSize, verifyChecksum)); |
| } |
| |
| @Override |
| public void renameInternal(Path src, Path dst) |
| throws IOException, UnresolvedLinkException { |
| dfs.rename(getUriPath(src), getUriPath(dst), Options.Rename.NONE); |
| } |
| |
| @Override |
| public void renameInternal(Path src, Path dst, boolean overwrite) |
| throws IOException, UnresolvedLinkException { |
| dfs.rename(getUriPath(src), getUriPath(dst), |
| overwrite ? Options.Rename.OVERWRITE : Options.Rename.NONE); |
| } |
| |
| @Override |
| public void setOwner(Path f, String username, String groupname) |
| throws IOException, UnresolvedLinkException { |
| dfs.setOwner(getUriPath(f), username, groupname); |
| } |
| |
| @Override |
| public void setPermission(Path f, FsPermission permission) |
| throws IOException, UnresolvedLinkException { |
| dfs.setPermission(getUriPath(f), permission); |
| } |
| |
| @Override |
| public boolean setReplication(Path f, short replication) |
| throws IOException, UnresolvedLinkException { |
| return dfs.setReplication(getUriPath(f), replication); |
| } |
| |
| @Override |
| public void setTimes(Path f, long mtime, long atime) |
| throws IOException, UnresolvedLinkException { |
| dfs.setTimes(getUriPath(f), mtime, atime); |
| } |
| |
| @Override |
| public void setVerifyChecksum(boolean verifyChecksum) |
| throws IOException { |
| this.verifyChecksum = verifyChecksum; |
| } |
| |
| @Override |
| public boolean supportsSymlinks() { |
| return true; |
| } |
| |
| @Override |
| public void createSymlink(Path target, Path link, boolean createParent) |
| throws IOException, UnresolvedLinkException { |
| dfs.createSymlink(target.toString(), getUriPath(link), createParent); |
| } |
| |
| @Override |
| public Path getLinkTarget(Path p) throws IOException { |
| return new Path(dfs.getLinkTarget(getUriPath(p))); |
| } |
| |
| @Override //AbstractFileSystem |
| public List<Token<?>> getDelegationTokens(String renewer) throws IOException { |
| Token<DelegationTokenIdentifier> result = dfs |
| .getDelegationToken(renewer == null ? null : new Text(renewer)); |
| result.setService(new Text(this.getCanonicalServiceName())); |
| List<Token<?>> tokenList = new ArrayList<Token<?>>(); |
| tokenList.add(result); |
| return tokenList; |
| } |
| |
| /** |
| * Renew an existing delegation token. |
| * |
| * @param token delegation token obtained earlier |
| * @return the new expiration time |
| * @throws InvalidToken |
| * @throws IOException |
| * @deprecated Use Token.renew instead. |
| */ |
| @SuppressWarnings("unchecked") |
| public long renewDelegationToken( |
| Token<? extends AbstractDelegationTokenIdentifier> token) |
| throws InvalidToken, IOException { |
| return dfs.renewDelegationToken((Token<DelegationTokenIdentifier>) token); |
| } |
| |
| /** |
| * Cancel an existing delegation token. |
| * |
| * @param token delegation token |
| * @throws InvalidToken |
| * @throws IOException |
| * @deprecated Use Token.cancel instead. |
| */ |
| @SuppressWarnings("unchecked") |
| public void cancelDelegationToken( |
| Token<? extends AbstractDelegationTokenIdentifier> token) |
| throws InvalidToken, IOException { |
| dfs.cancelDelegationToken((Token<DelegationTokenIdentifier>) token); |
| } |
| } |