blob: 02088c81734ab98f280dc0f12a718a2aee8612a8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.intOption;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.longOption;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* Implementation of {@link FileSystem} for <a href="https://oss.aliyun.com">
* Aliyun OSS</a>, used to access OSS blob system in a filesystem style.
*/
public class AliyunOSSFileSystem extends FileSystem {
private static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSFileSystem.class);
private URI uri;
private String bucket;
private String username;
private Path workingDir;
private int blockOutputActiveBlocks;
private AliyunOSSFileSystemStore store;
private int maxKeys;
private int maxReadAheadPartNumber;
private int maxConcurrentCopyTasksPerDir;
private ListeningExecutorService boundedThreadPool;
private ListeningExecutorService boundedCopyThreadPool;
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
public boolean accept(Path file) {
return true;
}
};
@Override
public FSDataOutputStream append(Path path, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Append is not supported!");
}
@Override
public void close() throws IOException {
try {
store.close();
boundedThreadPool.shutdown();
boundedCopyThreadPool.shutdown();
} finally {
super.close();
}
}
@Override
public FSDataOutputStream create(Path path, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
String key = pathToKey(path);
FileStatus status = null;
try {
// get the status or throw a FNFE
status = getFileStatus(path);
// if the thread reaches here, there is something at the path
if (status.isDirectory()) {
// path references a directory
throw new FileAlreadyExistsException(path + " is a directory");
}
if (!overwrite) {
// path references a file and overwrite is disabled
throw new FileAlreadyExistsException(path + " already exists");
}
LOG.debug("Overwriting file {}", path);
} catch (FileNotFoundException e) {
// this means the file is not found
}
long uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(getConf(),
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
return new FSDataOutputStream(
new AliyunOSSBlockOutputStream(getConf(),
store,
key,
uploadPartSize,
new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true)), statistics);
}
/**
* {@inheritDoc}
* @throws FileNotFoundException if the parent directory is not present -or
* is not a directory.
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
if (!getFileStatus(parent).isDirectory()) {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
bufferSize, replication, blockSize, progress);
}
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
try {
return innerDelete(getFileStatus(path), recursive);
} catch (FileNotFoundException e) {
LOG.debug("Couldn't delete {} - does not exist", path);
return false;
}
}
/**
* Delete an object. See {@link #delete(Path, boolean)}.
*
* @param status fileStatus object
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if delete is successful else false.
* @throws IOException due to inability to delete a directory or file.
*/
private boolean innerDelete(FileStatus status, boolean recursive)
throws IOException {
Path f = status.getPath();
String p = f.toUri().getPath();
FileStatus[] statuses;
// indicating root directory "/".
if (p.equals("/")) {
statuses = listStatus(status.getPath());
boolean isEmptyDir = statuses.length <= 0;
return rejectRootDirectoryDelete(isEmptyDir, recursive);
}
String key = pathToKey(f);
if (status.isDirectory()) {
if (!recursive) {
// Check whether it is an empty directory or not
statuses = listStatus(status.getPath());
if (statuses.length > 0) {
throw new IOException("Cannot remove directory " + f +
": It is not empty!");
} else {
// Delete empty directory without '-r'
key = AliyunOSSUtils.maybeAddTrailingSlash(key);
store.deleteObject(key);
}
} else {
store.deleteDirs(key);
}
} else {
store.deleteObject(key);
}
createFakeDirectoryIfNecessary(f);
return true;
}
/**
* Implements the specific logic to reject root directory deletion.
* The caller must return the result of this call, rather than
* attempt to continue with the delete operation: deleting root
* directories is never allowed. This method simply implements
* the policy of when to return an exit code versus raise an exception.
* @param isEmptyDir empty directory or not
* @param recursive recursive flag from command
* @return a return code for the operation
* @throws PathIOException if the operation was explicitly rejected.
*/
private boolean rejectRootDirectoryDelete(boolean isEmptyDir,
boolean recursive) throws IOException {
LOG.info("oss delete the {} root directory of {}", bucket, recursive);
if (isEmptyDir) {
return true;
}
if (recursive) {
return false;
} else {
// reject
throw new PathIOException(bucket, "Cannot delete root path");
}
}
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
String key = pathToKey(f);
if (StringUtils.isNotEmpty(key) && !exists(f)) {
LOG.debug("Creating new fake directory at {}", f);
mkdir(pathToKey(f.getParent()));
}
}
@Override
public FileStatus getFileStatus(Path path) throws IOException {
Path qualifiedPath = path.makeQualified(uri, workingDir);
String key = pathToKey(qualifiedPath);
// Root always exists
if (key.length() == 0) {
return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username);
}
ObjectMetadata meta = store.getObjectMetadata(key);
// If key not found and key does not end with "/"
if (meta == null && !key.endsWith("/")) {
// In case of 'dir + "/"'
key += "/";
meta = store.getObjectMetadata(key);
}
if (meta == null) {
ObjectListing listing = store.listObjects(key, 1, null, false);
do {
if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username);
} else if (listing.isTruncated()) {
listing = store.listObjects(key, 1000, listing.getNextMarker(),
false);
} else {
throw new FileNotFoundException(
path + ": No such file or directory!");
}
} while (true);
} else if (objectRepresentsDirectory(key, meta.getContentLength())) {
return new OSSFileStatus(0, true, 1, 0, meta.getLastModified().getTime(),
qualifiedPath, username);
} else {
return new OSSFileStatus(meta.getContentLength(), false, 1,
getDefaultBlockSize(path), meta.getLastModified().getTime(),
qualifiedPath, username);
}
}
@Override
public String getScheme() {
return "oss";
}
@Override
public URI getUri() {
return uri;
}
@Override
public int getDefaultPort() {
return Constants.OSS_DEFAULT_PORT;
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Deprecated
public long getDefaultBlockSize() {
return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT);
}
@Override
public String getCanonicalServiceName() {
// Does not support Token
return null;
}
/**
* Initialize new FileSystem.
*
* @param name the uri of the file system, including host, port, etc.
* @param conf configuration of the file system
* @throws IOException IO problems
*/
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
bucket = name.getHost();
uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
// Username is the current user at the time the FS was instantiated.
username = UserGroupInformation.getCurrentUser().getShortUserName();
workingDir = new Path("/user", username).makeQualified(uri, null);
long keepAliveTime = longOption(conf,
KEEPALIVE_TIME_KEY, KEEPALIVE_TIME_DEFAULT, 0);
blockOutputActiveBlocks = intOption(conf,
UPLOAD_ACTIVE_BLOCKS_KEY, UPLOAD_ACTIVE_BLOCKS_DEFAULT, 1);
store = new AliyunOSSFileSystemStore();
store.initialize(name, conf, username, statistics);
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
int threadNum = AliyunOSSUtils.intPositiveOption(conf,
Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY,
Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT);
int totalTasks = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_TOTAL_TASKS_KEY, Constants.MAX_TOTAL_TASKS_DEFAULT);
maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf,
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY,
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
threadNum, totalTasks, keepAliveTime, TimeUnit.SECONDS,
"oss-transfer-shared");
maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY,
Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_DEFAULT);
int maxCopyThreads = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_COPY_THREADS_NUM_KEY,
Constants.MAX_COPY_THREADS_DEFAULT);
int maxCopyTasks = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_COPY_TASKS_KEY,
Constants.MAX_COPY_TASKS_DEFAULT);
this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxCopyThreads, maxCopyTasks, 60L,
TimeUnit.SECONDS, "oss-copy-unbounded");
setConf(conf);
}
/**
* Turn a path (relative or otherwise) into an OSS key.
*
* @param path the path of the file.
* @return the key of the object that represents the file.
*/
private String pathToKey(Path path) {
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
return path.toUri().getPath().substring(1);
}
private Path keyToPath(String key) {
return new Path("/" + key);
}
@Override
public RemoteIterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive) throws IOException {
Path qualifiedPath = f.makeQualified(uri, workingDir);
final FileStatus status = getFileStatus(qualifiedPath);
PathFilter filter = new PathFilter() {
@Override
public boolean accept(Path path) {
return status.isFile() || !path.equals(f);
}
};
FileStatusAcceptor acceptor =
new FileStatusAcceptor.AcceptFilesOnly(qualifiedPath);
return innerList(f, status, filter, acceptor, recursive);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException {
return listLocatedStatus(f, DEFAULT_FILTER);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
final PathFilter filter) throws IOException {
Path qualifiedPath = f.makeQualified(uri, workingDir);
final FileStatus status = getFileStatus(qualifiedPath);
FileStatusAcceptor acceptor =
new FileStatusAcceptor.AcceptAllButSelf(qualifiedPath);
return innerList(f, status, filter, acceptor, false);
}
private RemoteIterator<LocatedFileStatus> innerList(final Path f,
final FileStatus status,
final PathFilter filter,
final FileStatusAcceptor acceptor,
final boolean recursive) throws IOException {
Path qualifiedPath = f.makeQualified(uri, workingDir);
String key = pathToKey(qualifiedPath);
if (status.isFile()) {
LOG.debug("{} is a File", qualifiedPath);
final BlockLocation[] locations = getFileBlockLocations(status,
0, status.getLen());
return store.singleStatusRemoteIterator(filter.accept(f) ? status : null,
locations);
} else {
return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
acceptor, recursive ? null : "/");
}
}
@Override
public FileStatus[] listStatus(Path path) throws IOException {
String key = pathToKey(path);
if (LOG.isDebugEnabled()) {
LOG.debug("List status for path: " + path);
}
final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: doing listObjects for directory " + key);
}
ObjectListing objects = store.listObjects(key, maxKeys, null, false);
while (true) {
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String objKey = objectSummary.getKey();
if (objKey.equals(key + "/")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + objKey);
}
continue;
} else {
Path keyPath = keyToPath(objectSummary.getKey())
.makeQualified(uri, workingDir);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fi: " + keyPath);
}
result.add(new OSSFileStatus(objectSummary.getSize(), false, 1,
getDefaultBlockSize(keyPath),
objectSummary.getLastModified().getTime(), keyPath, username));
}
}
for (String prefix : objects.getCommonPrefixes()) {
if (prefix.equals(key + "/")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + prefix);
}
continue;
} else {
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd: " + keyPath);
}
result.add(getFileStatus(keyPath));
}
}
if (objects.isTruncated()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch");
}
String nextMarker = objects.getNextMarker();
objects = store.listObjects(key, maxKeys, nextMarker, false);
} else {
break;
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd (not a dir): " + path);
}
result.add(fileStatus);
}
return result.toArray(new FileStatus[result.size()]);
}
/**
* Used to create an empty file that represents an empty directory.
*
* @param key directory path
* @return true if directory is successfully created
* @throws IOException
*/
private boolean mkdir(final String key) throws IOException {
String dirName = key;
if (StringUtils.isNotEmpty(key)) {
if (!key.endsWith("/")) {
dirName += "/";
}
store.storeEmptyFile(dirName);
}
return true;
}
@Override
public boolean mkdirs(Path path, FsPermission permission)
throws IOException {
try {
FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
return true;
} else {
throw new FileAlreadyExistsException("Path is a file: " + path);
}
} catch (FileNotFoundException e) {
validatePath(path);
String key = pathToKey(path);
return mkdir(key);
}
}
/**
* Check whether the path is a valid path.
*
* @param path the path to be checked.
* @throws IOException
*/
private void validatePath(Path path) throws IOException {
Path fPart = path.getParent();
do {
try {
FileStatus fileStatus = getFileStatus(fPart);
if (fileStatus.isDirectory()) {
// If path exists and a directory, exit
break;
} else {
throw new FileAlreadyExistsException(String.format(
"Can't make directory for path '%s', it is a file.", fPart));
}
} catch (FileNotFoundException fnfe) {
}
fPart = fPart.getParent();
} while (fPart != null);
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open " + path +
" because it is a directory");
}
return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
new SemaphoredDelegatingExecutor(
boundedThreadPool, maxReadAheadPartNumber, true),
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
statistics));
}
@Override
public boolean rename(Path srcPath, Path dstPath) throws IOException {
if (srcPath.isRoot()) {
// Cannot rename root of file system
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot rename the root of a filesystem");
}
return false;
}
Path parent = dstPath.getParent();
while (parent != null && !srcPath.equals(parent)) {
parent = parent.getParent();
}
if (parent != null) {
return false;
}
FileStatus srcStatus = getFileStatus(srcPath);
FileStatus dstStatus;
try {
dstStatus = getFileStatus(dstPath);
} catch (FileNotFoundException fnde) {
dstStatus = null;
}
if (dstStatus == null) {
// If dst doesn't exist, check whether dst dir exists or not
dstStatus = getFileStatus(dstPath.getParent());
if (!dstStatus.isDirectory()) {
throw new IOException(String.format(
"Failed to rename %s to %s, %s is a file", srcPath, dstPath,
dstPath.getParent()));
}
} else {
if (srcStatus.getPath().equals(dstStatus.getPath())) {
return !srcStatus.isDirectory();
} else if (dstStatus.isDirectory()) {
// If dst is a directory
dstPath = new Path(dstPath, srcPath.getName());
FileStatus[] statuses;
try {
statuses = listStatus(dstPath);
} catch (FileNotFoundException fnde) {
statuses = null;
}
if (statuses != null && statuses.length > 0) {
// If dst exists and not a directory / not empty
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists or not empty!",
srcPath, dstPath));
}
} else {
// If dst is not a directory
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists!", srcPath,
dstPath));
}
}
boolean succeed;
if (srcStatus.isDirectory()) {
succeed = copyDirectory(srcPath, dstPath);
} else {
succeed = copyFile(srcPath, srcStatus.getLen(), dstPath);
}
return srcPath.equals(dstPath) || (succeed && delete(srcPath, true));
}
/**
* Copy file from source path to destination path.
* (the caller should make sure srcPath is a file and dstPath is valid)
*
* @param srcPath source path.
* @param srcLen source path length if it is a file.
* @param dstPath destination path.
* @return true if file is successfully copied.
*/
private boolean copyFile(Path srcPath, long srcLen, Path dstPath) {
String srcKey = pathToKey(srcPath);
String dstKey = pathToKey(dstPath);
return store.copyFile(srcKey, srcLen, dstKey);
}
/**
* Copy a directory from source path to destination path.
* (the caller should make sure srcPath is a directory, and dstPath is valid)
*
* @param srcPath source path.
* @param dstPath destination path.
* @return true if directory is successfully copied.
*/
private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
String srcKey = AliyunOSSUtils
.maybeAddTrailingSlash(pathToKey(srcPath));
String dstKey = AliyunOSSUtils
.maybeAddTrailingSlash(pathToKey(dstPath));
if (dstKey.startsWith(srcKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot rename a directory to a subdirectory of self");
}
return false;
}
store.storeEmptyFile(dstKey);
AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
ExecutorService executorService = MoreExecutors.listeningDecorator(
new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
maxConcurrentCopyTasksPerDir, true));
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
// Copy files from src folder to dst
int copiesToFinish = 0;
while (true) {
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String newKey =
dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
//copy operation just copies metadata, oss will support shallow copy
executorService.execute(new AliyunOSSCopyFileTask(
store, objectSummary.getKey(),
objectSummary.getSize(), newKey, copyFileContext));
copiesToFinish++;
// No need to call lock() here.
// It's ok to copy one more file if the rename operation failed
// Reduce the call of lock() can also improve our performance
if (copyFileContext.isCopyFailure()) {
//some error occurs, break
break;
}
}
if (objects.isTruncated()) {
String nextMarker = objects.getNextMarker();
objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
} else {
break;
}
}
//wait operations in progress to finish
copyFileContext.lock();
try {
copyFileContext.awaitAllFinish(copiesToFinish);
} catch (InterruptedException e) {
LOG.warn("interrupted when wait copies to finish");
} finally {
copyFileContext.unlock();
}
return !copyFileContext.isCopyFailure();
}
@Override
public void setWorkingDirectory(Path dir) {
this.workingDir = dir;
}
public AliyunOSSFileSystemStore getStore() {
return store;
}
}