| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.fs.cosn; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.HashMap; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.BufferedFSInputStream; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathIOException; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.retry.RetryPolicies; |
| import org.apache.hadoop.io.retry.RetryPolicy; |
| import org.apache.hadoop.io.retry.RetryProxy; |
| import org.apache.hadoop.util.BlockingThreadPoolExecutorService; |
| import org.apache.hadoop.util.Progressable; |
| |
| /** |
| * The core CosN Filesystem implementation. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Stable |
| public class CosNFileSystem extends FileSystem { |
| static final Logger LOG = LoggerFactory.getLogger(CosNFileSystem.class); |
| |
| public static final String SCHEME = "cosn"; |
| public static final String PATH_DELIMITER = Path.SEPARATOR; |
| |
| private URI uri; |
| private String bucket; |
| private NativeFileSystemStore store; |
| private Path workingDir; |
| private String owner = "Unknown"; |
| private String group = "Unknown"; |
| |
| private ExecutorService boundedIOThreadPool; |
| private ExecutorService boundedCopyThreadPool; |
| |
| public CosNFileSystem() { |
| } |
| |
| public CosNFileSystem(NativeFileSystemStore store) { |
| this.store = store; |
| } |
| |
| /** |
| * Return the protocol scheme for the FileSystem. |
| * |
| * @return <code>cosn</code> |
| */ |
| @Override |
| public String getScheme() { |
| return CosNFileSystem.SCHEME; |
| } |
| |
| @Override |
| public void initialize(URI name, Configuration conf) throws IOException { |
| super.initialize(name, conf); |
| this.bucket = name.getHost(); |
| if (this.store == null) { |
| this.store = createDefaultStore(conf); |
| } |
| this.store.initialize(name, conf); |
| setConf(conf); |
| this.uri = URI.create(name.getScheme() + "://" + name.getAuthority()); |
| this.workingDir = new Path("/user", |
| System.getProperty("user.name")).makeQualified( |
| this.uri, |
| this.getWorkingDirectory()); |
| this.owner = getOwnerId(); |
| this.group = getGroupId(); |
| LOG.debug("owner:" + owner + ", group:" + group); |
| |
| BufferPool.getInstance().initialize(this.getConf()); |
| |
| // initialize the thread pool |
| int uploadThreadPoolSize = this.getConf().getInt( |
| CosNConfigKeys.UPLOAD_THREAD_POOL_SIZE_KEY, |
| CosNConfigKeys.DEFAULT_UPLOAD_THREAD_POOL_SIZE |
| ); |
| int readAheadPoolSize = this.getConf().getInt( |
| CosNConfigKeys.READ_AHEAD_QUEUE_SIZE, |
| CosNConfigKeys.DEFAULT_READ_AHEAD_QUEUE_SIZE |
| ); |
| int ioThreadPoolSize = uploadThreadPoolSize + readAheadPoolSize / 3; |
| long threadKeepAlive = this.getConf().getLong( |
| CosNConfigKeys.THREAD_KEEP_ALIVE_TIME_KEY, |
| CosNConfigKeys.DEFAULT_THREAD_KEEP_ALIVE_TIME |
| ); |
| this.boundedIOThreadPool = BlockingThreadPoolExecutorService.newInstance( |
| ioThreadPoolSize / 2, ioThreadPoolSize, |
| threadKeepAlive, TimeUnit.SECONDS, |
| "cos-transfer-thread-pool"); |
| int copyThreadPoolSize = this.getConf().getInt( |
| CosNConfigKeys.COPY_THREAD_POOL_SIZE_KEY, |
| CosNConfigKeys.DEFAULT_COPY_THREAD_POOL_SIZE |
| ); |
| this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance( |
| CosNConfigKeys.DEFAULT_COPY_THREAD_POOL_SIZE, copyThreadPoolSize, |
| 60L, TimeUnit.SECONDS, |
| "cos-copy-thread-pool"); |
| } |
| |
| private static NativeFileSystemStore createDefaultStore(Configuration conf) { |
| NativeFileSystemStore store = new CosNativeFileSystemStore(); |
| RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( |
| conf.getInt(CosNConfigKeys.COSN_MAX_RETRIES_KEY, |
| CosNConfigKeys.DEFAULT_MAX_RETRIES), |
| conf.getLong(CosNConfigKeys.COSN_RETRY_INTERVAL_KEY, |
| CosNConfigKeys.DEFAULT_RETRY_INTERVAL), |
| TimeUnit.SECONDS); |
| Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = |
| new HashMap<>(); |
| |
| exceptionToPolicyMap.put(IOException.class, basePolicy); |
| RetryPolicy methodPolicy = RetryPolicies.retryByException( |
| RetryPolicies.TRY_ONCE_THEN_FAIL, |
| exceptionToPolicyMap); |
| Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>(); |
| methodNameToPolicyMap.put("storeFile", methodPolicy); |
| methodNameToPolicyMap.put("rename", methodPolicy); |
| |
| return (NativeFileSystemStore) RetryProxy.create( |
| NativeFileSystemStore.class, store, methodNameToPolicyMap); |
| } |
| |
| private String getOwnerId() { |
| return System.getProperty("user.name"); |
| } |
| |
| private String getGroupId() { |
| return System.getProperty("user.name"); |
| } |
| |
| private String getOwnerInfo(boolean getOwnerId) { |
| String ownerInfoId = ""; |
| try { |
| String userName = System.getProperty("user.name"); |
| String command = "id -u " + userName; |
| if (!getOwnerId) { |
| command = "id -g " + userName; |
| } |
| Process child = Runtime.getRuntime().exec(command); |
| child.waitFor(); |
| |
| // Get the input stream and read from it |
| InputStream in = child.getInputStream(); |
| StringBuilder strBuffer = new StringBuilder(); |
| int c; |
| while ((c = in.read()) != -1) { |
| strBuffer.append((char) c); |
| } |
| in.close(); |
| ownerInfoId = strBuffer.toString(); |
| } catch (IOException | InterruptedException e) { |
| LOG.error("Getting owner info occurs a exception", e); |
| } |
| return ownerInfoId; |
| } |
| |
| private static String pathToKey(Path path) { |
| if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { |
| // allow uris without trailing slash after bucket to refer to root, |
| // like cosn://mybucket |
| return ""; |
| } |
| if (!path.isAbsolute()) { |
| throw new IllegalArgumentException("Path must be absolute: " + path); |
| } |
| String ret = path.toUri().getPath(); |
| if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { |
| ret = ret.substring(0, ret.length() - 1); |
| } |
| return ret; |
| } |
| |
| private static Path keyToPath(String key) { |
| if (!key.startsWith(PATH_DELIMITER)) { |
| return new Path("/" + key); |
| } else { |
| return new Path(key); |
| } |
| } |
| |
| private Path makeAbsolute(Path path) { |
| if (path.isAbsolute()) { |
| return path; |
| } |
| return new Path(workingDir, path); |
| } |
| |
| /** |
| * This optional operation is not yet supported. |
| */ |
| @Override |
| public FSDataOutputStream append(Path f, int bufferSize, |
| Progressable progress) throws IOException { |
| throw new IOException("Not supported"); |
| } |
| |
| @Override |
| public FSDataOutputStream create(Path f, FsPermission permission, |
| boolean overwrite, int bufferSize, short replication, long blockSize, |
| Progressable progress) throws IOException { |
| FileStatus fileStatus; |
| |
| try { |
| fileStatus = getFileStatus(f); |
| if (fileStatus.isDirectory()) { |
| throw new FileAlreadyExistsException(f + " is a directory"); |
| } |
| if (!overwrite) { |
| // path references a file and overwrite is disabled |
| throw new FileAlreadyExistsException(f + " already exists"); |
| } |
| |
| } catch (FileNotFoundException e) { |
| LOG.debug("Creating a new file: [{}] in COS.", f); |
| } |
| |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| return new FSDataOutputStream( |
| new CosNOutputStream(getConf(), store, key, blockSize, |
| this.boundedIOThreadPool), statistics); |
| } |
| |
| private boolean rejectRootDirectoryDelete(boolean isEmptyDir, |
| boolean recursive) throws PathIOException { |
| if (isEmptyDir) { |
| return true; |
| } |
| if (recursive) { |
| return false; |
| } else { |
| throw new PathIOException(this.bucket, "Can not delete root path"); |
| } |
| } |
| |
| @Override |
| public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, |
| EnumSet<CreateFlag> flags, int bufferSize, short replication, |
| long blockSize, Progressable progress) throws IOException { |
| Path parent = f.getParent(); |
| if (null != parent) { |
| if (!getFileStatus(parent).isDirectory()) { |
| throw new FileAlreadyExistsException("Not a directory: " + parent); |
| } |
| } |
| |
| return create(f, permission, flags.contains(CreateFlag.OVERWRITE), |
| bufferSize, replication, blockSize, progress); |
| } |
| |
| @Override |
| public boolean delete(Path f, boolean recursive) throws IOException { |
| LOG.debug("Ready to delete path: [{}]. recursive: [{}].", f, recursive); |
| FileStatus status; |
| try { |
| status = getFileStatus(f); |
| } catch (FileNotFoundException e) { |
| LOG.debug("Ready to delete the file: [{}], but it does not exist.", f); |
| return false; |
| } |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| if (key.compareToIgnoreCase("/") == 0) { |
| FileStatus[] fileStatuses = listStatus(f); |
| return this.rejectRootDirectoryDelete( |
| fileStatuses.length == 0, recursive); |
| } |
| |
| if (status.isDirectory()) { |
| if (!key.endsWith(PATH_DELIMITER)) { |
| key += PATH_DELIMITER; |
| } |
| if (!recursive && listStatus(f).length > 0) { |
| String errMsg = String.format("Can not delete the directory: [%s], as" |
| + " it is not empty and option recursive is false.", f); |
| throw new IOException(errMsg); |
| } |
| |
| createParent(f); |
| |
| String priorLastKey = null; |
| do { |
| PartialListing listing = store.list( |
| key, |
| Constants.COS_MAX_LISTING_LENGTH, |
| priorLastKey, |
| true); |
| for (FileMetadata file : listing.getFiles()) { |
| store.delete(file.getKey()); |
| } |
| for (FileMetadata commonPrefix : listing.getCommonPrefixes()) { |
| store.delete(commonPrefix.getKey()); |
| } |
| priorLastKey = listing.getPriorLastKey(); |
| } while (priorLastKey != null); |
| try { |
| store.delete(key); |
| } catch (Exception e) { |
| LOG.error("Deleting the COS key: [{}] occurs an exception.", key, e); |
| } |
| |
| } else { |
| LOG.debug("Delete the file: {}", f); |
| createParent(f); |
| store.delete(key); |
| } |
| return true; |
| } |
| |
| @Override |
| public FileStatus getFileStatus(Path f) throws IOException { |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| |
| if (key.length() == 0) { |
| // root always exists |
| return newDirectory(absolutePath); |
| } |
| |
| LOG.debug("Call the getFileStatus to obtain the metadata for " |
| + "the file: [{}].", f); |
| |
| FileMetadata meta = store.retrieveMetadata(key); |
| if (meta != null) { |
| if (meta.isFile()) { |
| LOG.debug("Path: [{}] is a file. COS key: [{}]", f, key); |
| return newFile(meta, absolutePath); |
| } else { |
| LOG.debug("Path: [{}] is a dir. COS key: [{}]", f, key); |
| return newDirectory(meta, absolutePath); |
| } |
| } |
| |
| if (!key.endsWith(PATH_DELIMITER)) { |
| key += PATH_DELIMITER; |
| } |
| |
| // Considering that the object store's directory is a common prefix in |
| // the object key, it needs to check the existence of the path by listing |
| // the COS key. |
| LOG.debug("List COS key: [{}] to check the existence of the path.", key); |
| PartialListing listing = store.list(key, 1); |
| if (listing.getFiles().length > 0 |
| || listing.getCommonPrefixes().length > 0) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Path: [{}] is a directory. COS key: [{}]", f, key); |
| } |
| return newDirectory(absolutePath); |
| } |
| |
| throw new FileNotFoundException( |
| "No such file or directory '" + absolutePath + "'"); |
| } |
| |
| @Override |
| public URI getUri() { |
| return uri; |
| } |
| |
| /** |
| * <p> |
| * If <code>f</code> is a file, this method will make a single call to COS. |
| * If <code>f</code> is a directory, |
| * this method will make a maximum of ( <i>n</i> / 199) + 2 calls to cos, |
| * where <i>n</i> is the total number of files |
| * and directories contained directly in <code>f</code>. |
| * </p> |
| */ |
| @Override |
| public FileStatus[] listStatus(Path f) throws IOException { |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| |
| if (key.length() > 0) { |
| FileStatus fileStatus = this.getFileStatus(f); |
| if (fileStatus.isFile()) { |
| return new FileStatus[]{fileStatus}; |
| } |
| } |
| |
| if (!key.endsWith(PATH_DELIMITER)) { |
| key += PATH_DELIMITER; |
| } |
| |
| URI pathUri = absolutePath.toUri(); |
| Set<FileStatus> status = new TreeSet<>(); |
| String priorLastKey = null; |
| do { |
| PartialListing listing = store.list( |
| key, Constants.COS_MAX_LISTING_LENGTH, priorLastKey, false); |
| for (FileMetadata fileMetadata : listing.getFiles()) { |
| Path subPath = keyToPath(fileMetadata.getKey()); |
| if (fileMetadata.getKey().equals(key)) { |
| // this is just the directory we have been asked to list. |
| LOG.debug("The file list contains the COS key [{}] to be listed.", |
| key); |
| } else { |
| status.add(newFile(fileMetadata, subPath)); |
| } |
| } |
| |
| for (FileMetadata commonPrefix : listing.getCommonPrefixes()) { |
| Path subPath = keyToPath(commonPrefix.getKey()); |
| String relativePath = pathUri.relativize(subPath.toUri()).getPath(); |
| status.add( |
| newDirectory(commonPrefix, new Path(absolutePath, relativePath))); |
| } |
| priorLastKey = listing.getPriorLastKey(); |
| } while (priorLastKey != null); |
| |
| return status.toArray(new FileStatus[status.size()]); |
| } |
| |
| private FileStatus newFile(FileMetadata meta, Path path) { |
| return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), |
| meta.getLastModified(), 0, null, this.owner, this.group, |
| path.makeQualified(this.getUri(), this.getWorkingDirectory())); |
| } |
| |
| private FileStatus newDirectory(Path path) { |
| return new FileStatus(0, true, 1, 0, 0, 0, null, this.owner, this.group, |
| path.makeQualified(this.getUri(), this.getWorkingDirectory())); |
| } |
| |
| private FileStatus newDirectory(FileMetadata meta, Path path) { |
| if (meta == null) { |
| return newDirectory(path); |
| } |
| return new FileStatus(0, true, 1, 0, meta.getLastModified(), |
| 0, null, this.owner, this.group, |
| path.makeQualified(this.getUri(), this.getWorkingDirectory())); |
| } |
| |
| /** |
| * Validate the path from the bottom up. |
| * |
| * @param path The path to be validated |
| * @throws FileAlreadyExistsException The specified path is an existing file |
| * @throws IOException Getting the file status of the |
| * specified path occurs |
| * an IOException. |
| */ |
| private void validatePath(Path path) throws IOException { |
| Path parent = path.getParent(); |
| do { |
| try { |
| FileStatus fileStatus = getFileStatus(parent); |
| if (fileStatus.isDirectory()) { |
| break; |
| } else { |
| throw new FileAlreadyExistsException(String.format( |
| "Can't make directory for path '%s', it is a file.", parent)); |
| } |
| } catch (FileNotFoundException e) { |
| LOG.debug("The Path: [{}] does not exist.", path); |
| } |
| parent = parent.getParent(); |
| } while (parent != null); |
| } |
| |
| @Override |
| public boolean mkdirs(Path f, FsPermission permission) throws IOException { |
| try { |
| FileStatus fileStatus = getFileStatus(f); |
| if (fileStatus.isDirectory()) { |
| return true; |
| } else { |
| throw new FileAlreadyExistsException("Path is a file: " + f); |
| } |
| } catch (FileNotFoundException e) { |
| validatePath(f); |
| } |
| |
| return mkDirRecursively(f, permission); |
| } |
| |
| /** |
| * Recursively create a directory. |
| * |
| * @param f Absolute path to the directory. |
| * @param permission Directory permissions. Permission does not work for |
| * the CosN filesystem currently. |
| * @return Return true if the creation was successful, throw a IOException. |
| * @throws IOException The specified path already exists or an error |
| * creating the path. |
| */ |
| public boolean mkDirRecursively(Path f, FsPermission permission) |
| throws IOException { |
| Path absolutePath = makeAbsolute(f); |
| List<Path> paths = new ArrayList<>(); |
| do { |
| paths.add(absolutePath); |
| absolutePath = absolutePath.getParent(); |
| } while (absolutePath != null); |
| |
| for (Path path : paths) { |
| if (path.equals(new Path(CosNFileSystem.PATH_DELIMITER))) { |
| break; |
| } |
| try { |
| FileStatus fileStatus = getFileStatus(path); |
| if (fileStatus.isFile()) { |
| throw new FileAlreadyExistsException( |
| String.format("Can't make directory for path: %s, " |
| + "since it is a file.", f)); |
| } |
| if (fileStatus.isDirectory()) { |
| break; |
| } |
| } catch (FileNotFoundException e) { |
| LOG.debug("Making dir: [{}] in COS", f); |
| |
| String folderPath = pathToKey(makeAbsolute(f)); |
| if (!folderPath.endsWith(PATH_DELIMITER)) { |
| folderPath += PATH_DELIMITER; |
| } |
| store.storeEmptyFile(folderPath); |
| } |
| } |
| return true; |
| } |
| |
| private boolean mkdir(Path f) throws IOException { |
| try { |
| FileStatus fileStatus = getFileStatus(f); |
| if (fileStatus.isFile()) { |
| throw new FileAlreadyExistsException( |
| String.format( |
| "Can't make directory for path '%s' since it is a file.", f)); |
| } |
| } catch (FileNotFoundException e) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Make directory: [{}] in COS.", f); |
| } |
| |
| String folderPath = pathToKey(makeAbsolute(f)); |
| if (!folderPath.endsWith(PATH_DELIMITER)) { |
| folderPath += PATH_DELIMITER; |
| } |
| store.storeEmptyFile(folderPath); |
| } |
| return true; |
| } |
| |
| @Override |
| public FSDataInputStream open(Path f, int bufferSize) throws IOException { |
| FileStatus fs = getFileStatus(f); // will throw if the file doesn't |
| // exist |
| if (fs.isDirectory()) { |
| throw new FileNotFoundException("'" + f + "' is a directory"); |
| } |
| LOG.info("Open the file: [{}] for reading.", f); |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| long fileSize = store.getFileLength(key); |
| return new FSDataInputStream(new BufferedFSInputStream( |
| new CosNInputStream(this.getConf(), store, statistics, key, fileSize, |
| this.boundedIOThreadPool), bufferSize)); |
| } |
| |
| @Override |
| public boolean rename(Path src, Path dst) throws IOException { |
| LOG.debug("Rename source path: [{}] to dest path: [{}].", src, dst); |
| |
| // Renaming the root directory is not allowed |
| if (src.isRoot()) { |
| LOG.debug("Cannot rename the root directory of a filesystem."); |
| return false; |
| } |
| |
| // check the source path whether exists or not |
| FileStatus srcFileStatus = this.getFileStatus(src); |
| |
| // Source path and destination path are not allowed to be the same |
| if (src.equals(dst)) { |
| LOG.debug("Source path and dest path refer to " |
| + "the same file or directory: [{}].", dst); |
| throw new IOException("Source path and dest path refer " |
| + "the same file or directory"); |
| } |
| |
| // It is not allowed to rename a parent directory to its subdirectory |
| Path dstParentPath; |
| for (dstParentPath = dst.getParent(); |
| null != dstParentPath && !src.equals(dstParentPath); |
| dstParentPath = dstParentPath.getParent()) { |
| // Recursively find the common parent path of the source and |
| // destination paths. |
| LOG.debug("Recursively find the common parent directory of the source " |
| + "and destination paths. The currently found parent path: {}", |
| dstParentPath); |
| } |
| |
| if (null != dstParentPath) { |
| LOG.debug("It is not allowed to rename a parent directory:[{}] " |
| + "to its subdirectory:[{}].", src, dst); |
| throw new IOException(String.format( |
| "It is not allowed to rename a parent directory: %s " |
| + "to its subdirectory: %s", src, dst)); |
| } |
| |
| FileStatus dstFileStatus; |
| try { |
| dstFileStatus = this.getFileStatus(dst); |
| |
| // The destination path exists and is a file, |
| // and the rename operation is not allowed. |
| if (dstFileStatus.isFile()) { |
| throw new FileAlreadyExistsException(String.format( |
| "File: %s already exists", dstFileStatus.getPath())); |
| } else { |
| // The destination path is an existing directory, |
| // and it is checked whether there is a file or directory |
| // with the same name as the source path under the destination path |
| dst = new Path(dst, src.getName()); |
| FileStatus[] statuses; |
| try { |
| statuses = this.listStatus(dst); |
| } catch (FileNotFoundException e) { |
| statuses = null; |
| } |
| if (null != statuses && statuses.length > 0) { |
| LOG.debug("Cannot rename source file: [{}] to dest file: [{}], " |
| + "because the file already exists.", src, dst); |
| throw new FileAlreadyExistsException( |
| String.format( |
| "File: %s already exists", dst |
| ) |
| ); |
| } |
| } |
| } catch (FileNotFoundException e) { |
| // destination path not exists |
| Path tempDstParentPath = dst.getParent(); |
| FileStatus dstParentStatus = this.getFileStatus(tempDstParentPath); |
| if (!dstParentStatus.isDirectory()) { |
| throw new IOException(String.format( |
| "Cannot rename %s to %s, %s is a file", src, dst, dst.getParent() |
| )); |
| } |
| // The default root directory is definitely there. |
| } |
| |
| boolean result; |
| if (srcFileStatus.isDirectory()) { |
| result = this.copyDirectory(src, dst); |
| } else { |
| result = this.copyFile(src, dst); |
| } |
| |
| if (!result) { |
| //Since rename is a non-atomic operation, after copy fails, |
| // it is not allowed to delete the data of the original path. |
| return false; |
| } else { |
| return this.delete(src, true); |
| } |
| } |
| |
| private boolean copyFile(Path srcPath, Path dstPath) throws IOException { |
| String srcKey = pathToKey(srcPath); |
| String dstKey = pathToKey(dstPath); |
| this.store.copy(srcKey, dstKey); |
| return true; |
| } |
| |
| private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { |
| String srcKey = pathToKey(srcPath); |
| if (!srcKey.endsWith(PATH_DELIMITER)) { |
| srcKey += PATH_DELIMITER; |
| } |
| String dstKey = pathToKey(dstPath); |
| if (!dstKey.endsWith(PATH_DELIMITER)) { |
| dstKey += PATH_DELIMITER; |
| } |
| |
| if (dstKey.startsWith(srcKey)) { |
| throw new IOException( |
| "can not copy a directory to a subdirectory of self"); |
| } |
| |
| this.store.storeEmptyFile(dstKey); |
| CosNCopyFileContext copyFileContext = new CosNCopyFileContext(); |
| |
| int copiesToFinishes = 0; |
| String priorLastKey = null; |
| do { |
| PartialListing objectList = this.store.list( |
| srcKey, Constants.COS_MAX_LISTING_LENGTH, priorLastKey, true); |
| for (FileMetadata file : objectList.getFiles()) { |
| this.boundedCopyThreadPool.execute(new CosNCopyFileTask( |
| this.store, |
| file.getKey(), |
| dstKey.concat(file.getKey().substring(srcKey.length())), |
| copyFileContext)); |
| copiesToFinishes++; |
| if (!copyFileContext.isCopySuccess()) { |
| break; |
| } |
| } |
| priorLastKey = objectList.getPriorLastKey(); |
| } while (null != priorLastKey); |
| |
| copyFileContext.lock(); |
| try { |
| copyFileContext.awaitAllFinish(copiesToFinishes); |
| } catch (InterruptedException e) { |
| LOG.warn("interrupted when wait copies to finish"); |
| } finally { |
| copyFileContext.lock(); |
| } |
| |
| return copyFileContext.isCopySuccess(); |
| } |
| |
| private void createParent(Path path) throws IOException { |
| Path parent = path.getParent(); |
| if (parent != null) { |
| String parentKey = pathToKey(parent); |
| LOG.debug("Create parent key: {}", parentKey); |
| if (!parentKey.equals(PATH_DELIMITER)) { |
| String key = pathToKey(makeAbsolute(parent)); |
| if (key.length() > 0) { |
| try { |
| store.storeEmptyFile(key + PATH_DELIMITER); |
| } catch (IOException e) { |
| LOG.debug("Store a empty file in COS failed.", e); |
| throw e; |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| @SuppressWarnings("deprecation") |
| public long getDefaultBlockSize() { |
| return getConf().getLong( |
| CosNConfigKeys.COSN_BLOCK_SIZE_KEY, |
| CosNConfigKeys.DEFAULT_BLOCK_SIZE); |
| } |
| |
| /** |
| * Set the working directory to the given directory. |
| */ |
| @Override |
| public void setWorkingDirectory(Path newDir) { |
| workingDir = newDir; |
| } |
| |
| @Override |
| public Path getWorkingDirectory() { |
| return workingDir; |
| } |
| |
| @Override |
| public String getCanonicalServiceName() { |
| // Does not support Token |
| return null; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| try { |
| this.store.close(); |
| this.boundedIOThreadPool.shutdown(); |
| this.boundedCopyThreadPool.shutdown(); |
| } finally { |
| super.close(); |
| } |
| } |
| } |