blob: b1f019374a6fe32bf7f2be6c81d9469f3f5f6bd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ozone.OFSPath;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_EMPTY;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_EMPTY;
/**
* The minimal Rooted Ozone Filesystem implementation.
* <p>
* This is a basic version which doesn't extend
* KeyProviderTokenIssuer and doesn't include statistics. It can be used
* from older hadoop version. For newer hadoop version use the full featured
* BasicRootedOzoneFileSystem.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BasicRootedOzoneFileSystem extends FileSystem {
static final Logger LOG =
LoggerFactory.getLogger(BasicRootedOzoneFileSystem.class);
/**
* The Ozone client for connecting to Ozone server.
*/
private URI uri;
private String userName;
private Path workingDir;
private OzoneClientAdapter adapter;
private BasicRootedOzoneClientAdapterImpl adapterImpl;
private static final String URI_EXCEPTION_TEXT =
"URL should be one of the following formats: " +
"ofs://om-service-id/path/to/key OR " +
"ofs://om-host.example.com/path/to/key OR " +
"ofs://om-host.example.com:5678/path/to/key";
@Override
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
setConf(conf);
Preconditions.checkNotNull(name.getScheme(),
"No scheme provided in %s", name);
Preconditions.checkArgument(getScheme().equals(name.getScheme()),
"Invalid scheme provided in %s", name);
String authority = name.getAuthority();
if (authority == null) {
// authority is null when fs.defaultFS is not a qualified ofs URI and
// ofs:/// is passed to the client. matcher will NPE if authority is null
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
}
String omHostOrServiceId;
int omPort = -1;
// Parse hostname and port
String[] parts = authority.split(":");
if (parts.length > 2) {
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
}
omHostOrServiceId = parts[0];
if (parts.length == 2) {
try {
omPort = Integer.parseInt(parts[1]);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
}
}
try {
uri = new URIBuilder().setScheme(OZONE_OFS_URI_SCHEME)
.setHost(authority)
.build();
LOG.trace("Ozone URI for OFS initialization is " + uri);
ConfigurationSource source = getConfSource();
this.adapter = createAdapter(source, omHostOrServiceId, omPort);
this.adapterImpl = (BasicRootedOzoneClientAdapterImpl) this.adapter;
try {
this.userName =
UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
this.userName = OZONE_DEFAULT_USER;
}
this.workingDir = new Path(OZONE_USER_DIR, this.userName)
.makeQualified(this.uri, this.workingDir);
} catch (URISyntaxException ue) {
final String msg = "Invalid Ozone endpoint " + name;
LOG.error(msg, ue);
throw new IOException(msg, ue);
}
}
protected OzoneClientAdapter createAdapter(ConfigurationSource conf,
String omHost, int omPort) throws IOException {
return new BasicRootedOzoneClientAdapterImpl(omHost, omPort, conf);
}
@Override
public void close() throws IOException {
try {
adapter.close();
} finally {
super.close();
}
}
@Override
public URI getUri() {
return uri;
}
@Override
public String getScheme() {
return OZONE_OFS_URI_SCHEME;
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
incrementCounter(Statistic.INVOCATION_OPEN, 1);
statistics.incrementReadOps(1);
LOG.trace("open() path: {}", path);
final String key = pathToKey(path);
return new FSDataInputStream(createFSInputStream(adapter.readFile(key)));
}
protected InputStream createFSInputStream(InputStream inputStream) {
return new OzoneFSInputStream(inputStream, statistics);
}
protected void incrementCounter(Statistic statistic) {
incrementCounter(statistic, 1);
}
protected void incrementCounter(Statistic statistic, long count) {
//don't do anything in this default implementation.
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize,
Progressable progress) throws IOException {
LOG.trace("create() path:{}", f);
incrementCounter(Statistic.INVOCATION_CREATE, 1);
statistics.incrementWriteOps(1);
final String key = pathToKey(f);
return createOutputStream(key, replication, overwrite, true);
}
@Override
public FSDataOutputStream createNonRecursive(Path path,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1);
statistics.incrementWriteOps(1);
final String key = pathToKey(path);
return createOutputStream(key,
replication, flags.contains(CreateFlag.OVERWRITE), false);
}
private FSDataOutputStream createOutputStream(String key, short replication,
boolean overwrite, boolean recursive) throws IOException {
return new FSDataOutputStream(adapter.createFile(key,
replication, overwrite, recursive), statistics);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new UnsupportedOperationException("append() Not implemented by the "
+ getClass().getSimpleName() + " FileSystem implementation");
}
private class RenameIterator extends OzoneListingIterator {
private final String srcPath;
private final String dstPath;
private final OzoneBucket bucket;
private final BasicRootedOzoneClientAdapterImpl adapterImpl;
RenameIterator(Path srcPath, Path dstPath)
throws IOException {
super(srcPath);
this.srcPath = pathToKey(srcPath);
this.dstPath = pathToKey(dstPath);
LOG.trace("rename from:{} to:{}", this.srcPath, this.dstPath);
// Initialize bucket here to reduce number of RPC calls
OFSPath ofsPath = new OFSPath(srcPath);
// TODO: Refactor later.
adapterImpl = (BasicRootedOzoneClientAdapterImpl) adapter;
this.bucket = adapterImpl.getBucket(ofsPath, false);
}
@Override
boolean processKeyPath(List<String> keyPathList) throws IOException {
for (String keyPath : keyPathList) {
String newPath = dstPath.concat(keyPath.substring(srcPath.length()));
adapterImpl.rename(this.bucket, keyPath, newPath);
}
return true;
}
}
/**
* Check whether the source and destination path are valid and then perform
* rename from source path to destination path.
* <p>
* The rename operation is performed by renaming the keys with src as prefix.
* For such keys the prefix is changed from src to dst.
*
* @param src source path for rename
* @param dst destination path for rename
* @return true if rename operation succeeded or
* if the src and dst have the same path and are of the same type
* @throws IOException on I/O errors or if the src/dst paths are invalid.
*/
@Override
public boolean rename(Path src, Path dst) throws IOException {
incrementCounter(Statistic.INVOCATION_RENAME, 1);
statistics.incrementWriteOps(1);
if (src.equals(dst)) {
return true;
}
LOG.trace("rename() from: {} to: {}", src, dst);
if (src.isRoot()) {
// Cannot rename root of file system
LOG.trace("Cannot rename the root of a filesystem");
return false;
}
// src and dst should be in the same bucket
OFSPath ofsSrc = new OFSPath(src);
OFSPath ofsDst = new OFSPath(dst);
if (!ofsSrc.isInSameBucketAs(ofsDst)) {
throw new IOException("Cannot rename a key to a different bucket");
}
OzoneBucket bucket = adapterImpl.getBucket(ofsSrc, false);
if (bucket.getBucketLayout().isFileSystemOptimized()) {
return renameFSO(bucket, ofsSrc, ofsDst);
}
// Cannot rename a directory to its own subdirectory
Path dstParent = dst.getParent();
while (dstParent != null && !src.equals(dstParent)) {
dstParent = dstParent.getParent();
}
Preconditions.checkArgument(dstParent == null,
"Cannot rename a directory to its own subdirectory");
// Check if the source exists
FileStatus srcStatus;
try {
srcStatus = getFileStatus(src);
} catch (FileNotFoundException fnfe) {
// source doesn't exist, return
return false;
}
// Check if the destination exists
FileStatus dstStatus;
try {
dstStatus = getFileStatus(dst);
} catch (FileNotFoundException fnde) {
dstStatus = null;
}
if (dstStatus == null) {
// If dst doesn't exist, check whether dst parent dir exists or not
// if the parent exists, the source can still be renamed to dst path
dstStatus = getFileStatus(dst.getParent());
if (!dstStatus.isDirectory()) {
throw new IOException(String.format(
"Failed to rename %s to %s, %s is a file", src, dst,
dst.getParent()));
}
} else {
// if dst exists and source and destination are same,
// check both the src and dst are of same type
if (srcStatus.getPath().equals(dstStatus.getPath())) {
return !srcStatus.isDirectory();
} else if (dstStatus.isDirectory()) {
// If dst is a directory, rename source as subpath of it.
// for example rename /source to /dst will lead to /dst/source
dst = new Path(dst, src.getName());
FileStatus[] statuses;
try {
statuses = listStatus(dst);
} catch (FileNotFoundException fnde) {
statuses = null;
}
if (statuses != null && statuses.length > 0) {
// If dst exists and not a directory not empty
LOG.warn("Failed to rename {} to {}, file already exists" +
" or not empty!", src, dst);
return false;
}
} else {
// If dst is not a directory
LOG.warn("Failed to rename {} to {}, file already exists!", src, dst);
return false;
}
}
if (srcStatus.isDirectory()) {
if (dst.toString().startsWith(src.toString() + OZONE_URI_DELIMITER)) {
LOG.trace("Cannot rename a directory to a subdirectory of self");
return false;
}
}
RenameIterator iterator = new RenameIterator(src, dst);
boolean result = iterator.iterate();
if (result) {
createFakeParentDirectory(src);
}
return result;
}
private boolean renameFSO(OzoneBucket bucket,
OFSPath srcPath, OFSPath dstPath) throws IOException {
// construct src and dst key paths
String srcKeyPath = srcPath.getNonKeyPathNoPrefixDelim() +
OZONE_URI_DELIMITER + srcPath.getKeyName();
String dstKeyPath = dstPath.getNonKeyPathNoPrefixDelim() +
OZONE_URI_DELIMITER + dstPath.getKeyName();
try {
adapterImpl.rename(bucket, srcKeyPath, dstKeyPath);
} catch (OMException ome) {
LOG.error("rename key failed: {}. source:{}, destin:{}",
ome.getMessage(), srcKeyPath, dstKeyPath);
if (OMException.ResultCodes.KEY_ALREADY_EXISTS == ome.getResult() ||
OMException.ResultCodes.KEY_RENAME_ERROR == ome.getResult() ||
OMException.ResultCodes.KEY_NOT_FOUND == ome.getResult()) {
return false;
} else {
throw ome;
}
}
return true;
}
/**
* Intercept rename to trash calls from TrashPolicyDefault.
*/
@Deprecated
@Override
protected void rename(final Path src, final Path dst,
final Options.Rename... options) throws IOException {
boolean hasMoveToTrash = false;
if (options != null) {
for (Options.Rename option : options) {
if (option == Options.Rename.TO_TRASH) {
hasMoveToTrash = true;
break;
}
}
}
if (!hasMoveToTrash) {
// if doesn't have TO_TRASH option, just pass the call to super
super.rename(src, dst, options);
} else {
rename(src, dst);
}
}
private class DeleteIterator extends OzoneListingIterator {
private final boolean recursive;
private final OzoneBucket bucket;
private final BasicRootedOzoneClientAdapterImpl adapterImpl;
DeleteIterator(Path f, boolean recursive)
throws IOException {
super(f);
this.recursive = recursive;
if (getStatus().isDirectory()
&& !this.recursive
&& listStatus(f).length != 0) {
throw new PathIsNotEmptyDirectoryException(f.toString());
}
// Initialize bucket here to reduce number of RPC calls
OFSPath ofsPath = new OFSPath(f);
// TODO: Refactor later.
adapterImpl = (BasicRootedOzoneClientAdapterImpl) adapter;
this.bucket = adapterImpl.getBucket(ofsPath, false);
}
@Override
boolean processKeyPath(List<String> keyPathList) {
LOG.trace("Deleting keys: {}", keyPathList);
boolean succeed = adapterImpl.deleteObjects(this.bucket, keyPathList);
// if recursive delete is requested ignore the return value of
// deleteObject and issue deletes for other keys.
return recursive || succeed;
}
}
/**
* To be used only by recursiveBucketDelete().
*/
private class DeleteIteratorWithFSO extends OzoneListingIterator {
private final OzoneBucket bucket;
private final BasicRootedOzoneClientAdapterImpl adapterImpl;
private boolean recursive;
private Path f;
DeleteIteratorWithFSO(Path f, boolean recursive)
throws IOException {
super(f, true);
this.f = f;
this.recursive = recursive;
// Initialize bucket here to reduce number of RPC calls
OFSPath ofsPath = new OFSPath(f);
adapterImpl = (BasicRootedOzoneClientAdapterImpl) adapter;
this.bucket = adapterImpl.getBucket(ofsPath, false);
LOG.debug("Deleting bucket with name {} is via DeleteIteratorWithFSO.",
bucket.getName());
}
@Override
boolean processKeyPath(List<String> keyPathList) throws IOException {
LOG.trace("Deleting keys: {}", keyPathList);
boolean succeed = keyPathList.isEmpty();
if (recursive && !succeed) {
succeed = adapterImpl.deleteObjects(keyPathList);
} else {
// Non empty paths cannot be deleted when recursive flag is false
if (!keyPathList.isEmpty()) {
throw new PathIsNotEmptyDirectoryException(f.toString());
}
}
return succeed;
}
}
private class DeleteIteratorFactory {
private Path path;
private boolean recursive;
private OFSPath ofsPath;
DeleteIteratorFactory(Path f, boolean recursive) {
this.path = f;
this.recursive = recursive;
this.ofsPath = new OFSPath(f);
}
OzoneListingIterator getDeleteIterator()
throws IOException {
OzoneListingIterator deleteIterator;
if (ofsPath.isBucket() &&
isFSObucket(ofsPath.getVolumeName(), ofsPath.getBucketName())) {
deleteIterator = new DeleteIteratorWithFSO(path, recursive);
} else {
deleteIterator = new DeleteIterator(path, recursive);
}
return deleteIterator;
}
}
/**
* Deletes the children of the input dir path by iterating though the
* DeleteIterator.
*
* @param f directory path to be deleted
* @return true if successfully deletes all required keys, false otherwise
* @throws IOException
*/
private boolean innerDelete(Path f, boolean recursive) throws IOException {
LOG.trace("delete() path:{} recursive:{}", f, recursive);
try {
OzoneListingIterator iterator =
new DeleteIteratorFactory(f, recursive).getDeleteIterator();
return iterator.iterate();
} catch (FileNotFoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Couldn't delete {} - does not exist", f);
}
return false;
}
}
/**
* {@inheritDoc}
*
* OFS supports volume and bucket deletion, recursive or non-recursive.
* e.g. delete(new Path("/volume1"), true)
* But root deletion is explicitly disallowed for safety concerns.
*/
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
incrementCounter(Statistic.INVOCATION_DELETE, 1);
statistics.incrementWriteOps(1);
LOG.debug("Delete path {} - recursive {}", f, recursive);
FileStatus status;
try {
status = getFileStatus(f);
} catch (FileNotFoundException ex) {
LOG.warn("delete: Path does not exist: {}", f);
return false;
}
if (status == null) {
return false;
}
String key = pathToKey(f);
boolean result;
if (status.isDirectory()) {
LOG.debug("delete: Path is a directory: {}", f);
OFSPath ofsPath = new OFSPath(key);
// Handle rm root
if (ofsPath.isRoot()) {
// Intentionally drop support for rm root
// because it is too dangerous and doesn't provide much value
LOG.warn("delete: OFS does not support rm root. "
+ "To wipe the cluster, please re-init OM instead.");
return false;
}
if (!ofsPath.isVolume() && !ofsPath.isBucket()) {
OzoneBucket bucket = adapterImpl.getBucket(ofsPath, false);
if (bucket.getBucketLayout().isFileSystemOptimized()) {
String ofsKeyPath = ofsPath.getNonKeyPathNoPrefixDelim() +
OZONE_URI_DELIMITER + ofsPath.getKeyName();
return adapterImpl.deleteObject(ofsKeyPath, recursive);
}
}
// Handle delete volume
if (ofsPath.isVolume()) {
String volumeName = ofsPath.getVolumeName();
if (recursive) {
// Delete all buckets first
OzoneVolume volume =
adapterImpl.getObjectStore().getVolume(volumeName);
Iterator<? extends OzoneBucket> it = volume.listBuckets("");
String prefixVolumePathStr = addTrailingSlashIfNeeded(f.toString());
while (it.hasNext()) {
OzoneBucket bucket = it.next();
String nextBucket = prefixVolumePathStr + bucket.getName();
delete(new Path(nextBucket), true);
}
}
try {
adapterImpl.getObjectStore().deleteVolume(volumeName);
return true;
} catch (OMException ex) {
// volume is not empty
if (ex.getResult() == VOLUME_NOT_EMPTY) {
throw new PathIsNotEmptyDirectoryException(f.toString());
} else {
throw ex;
}
}
}
boolean isBucketLink = false;
// check for bucket link
if (ofsPath.isBucket()) {
isBucketLink = adapterImpl.getBucket(ofsPath, false)
.isLink();
}
// if link, don't delete contents
if (isBucketLink) {
result = true;
} else {
result = innerDelete(f, recursive);
}
// Handle delete bucket
if (ofsPath.isBucket()) {
OzoneVolume volume =
adapterImpl.getObjectStore().getVolume(ofsPath.getVolumeName());
try {
volume.deleteBucket(ofsPath.getBucketName());
return result;
} catch (OMException ex) {
// bucket is not empty
if (ex.getResult() == BUCKET_NOT_EMPTY) {
throw new PathIsNotEmptyDirectoryException(f.toString());
} else {
throw ex;
}
}
}
} else {
LOG.debug("delete: Path is a file: {}", f);
result = adapter.deleteObject(key);
}
if (result) {
// If this delete operation removes all files/directories from the
// parent directory, then an empty parent directory must be created.
createFakeParentDirectory(f);
}
return result;
}
private boolean isFSObucket(String volumeName, String bucketName)
throws IOException {
OzoneVolume volume =
adapterImpl.getObjectStore().getVolume(volumeName);
OzoneBucket bucket = volume.getBucket(bucketName);
return bucket.getBucketLayout().isFileSystemOptimized();
}
/**
* Create a fake parent directory key if it does not already exist and no
* other child of this parent directory exists.
*
* @param f path to the fake parent directory
* @throws IOException
*/
private void createFakeParentDirectory(Path f) throws IOException {
Path parent = f.getParent();
if (parent != null && !parent.isRoot()) {
createFakeDirectoryIfNecessary(parent);
}
}
/**
* Create a fake directory key if it does not already exist.
*
* @param f path to the fake directory
* @throws IOException
*/
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
String key = pathToKey(f);
if (!key.isEmpty() && !o3Exists(f)) {
LOG.debug("Creating new fake directory at {}", f);
String dirKey = addTrailingSlashIfNeeded(key);
adapter.createDirectory(dirKey);
}
}
/**
* Check if a file or directory exists corresponding to given path.
*
* @param f path to file/directory.
* @return true if it exists, false otherwise.
* @throws IOException
*/
private boolean o3Exists(final Path f) throws IOException {
Path path = makeQualified(f);
try {
getFileStatus(path);
return true;
} catch (FileNotFoundException ex) {
return false;
}
}
@Override
public FileStatus[] listStatus(Path f) throws IOException {
incrementCounter(Statistic.INVOCATION_LIST_STATUS, 1);
statistics.incrementReadOps(1);
LOG.trace("listStatus() path:{}", f);
int numEntries = LISTING_PAGE_SIZE;
LinkedList<FileStatus> statuses = new LinkedList<>();
List<FileStatus> tmpStatusList;
String startPath = "";
do {
tmpStatusList =
adapter.listStatus(pathToKey(f), false, startPath,
numEntries, uri, workingDir, getUsername())
.stream()
.map(this::convertFileStatus)
.collect(Collectors.toList());
if (!tmpStatusList.isEmpty()) {
if (startPath.isEmpty()) {
statuses.addAll(tmpStatusList);
} else {
statuses.addAll(tmpStatusList.subList(1, tmpStatusList.size()));
}
startPath = pathToKey(statuses.getLast().getPath());
}
// listStatus returns entries numEntries in size if available.
// Any lesser number of entries indicate that the required entries have
// exhausted.
} while (tmpStatusList.size() == numEntries);
return statuses.toArray(new FileStatus[0]);
}
@Override
public void setWorkingDirectory(Path newDir) {
workingDir = newDir;
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Override
public Token<?> getDelegationToken(String renewer) throws IOException {
return adapter.getDelegationToken(renewer);
}
/**
* Get a canonical service name for this file system. If the URI is logical,
* the hostname part of the URI will be returned.
*
* @return a service string that uniquely identifies this file system.
*/
@Override
public String getCanonicalServiceName() {
return adapter.getCanonicalServiceName();
}
/**
* Get the username of the FS.
*
* @return the short name of the user who instantiated the FS
*/
public String getUsername() {
return userName;
}
/**
* Get the root directory of Trash for a path in OFS.
* Returns /<volumename>/<bucketname>/.Trash/<username>
* Caller appends either Current or checkpoint timestamp for trash destination
* @param path the trash root of the path to be determined.
* @return trash root
*/
@Override
public Path getTrashRoot(Path path) {
OFSPath ofsPath = new OFSPath(path);
return ofsPath.getTrashRoot();
}
/**
* Get all the trash roots of OFS for current user or for all the users.
* @param allUsers return trashRoots of all users if true, used by emptier
* @return trash roots
*/
@Override
public Collection<FileStatus> getTrashRoots(boolean allUsers) {
// Since get all trash roots for one or more users requires listing all
// volumes and buckets, we will let adapter impl handle it.
return adapterImpl.getTrashRoots(allUsers, this);
}
/**
* Creates a directory. Directory is represented using a key with no value.
*
* @param path directory path to be created
* @return true if directory exists or created successfully.
* @throws IOException
*/
private boolean mkdir(Path path) throws IOException {
return adapter.createDirectory(pathToKey(path));
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
incrementCounter(Statistic.INVOCATION_MKDIRS);
LOG.trace("mkdir() path:{} ", f);
String key = pathToKey(f);
if (isEmpty(key)) {
return false;
}
return mkdir(f);
}
@Override
public long getDefaultBlockSize() {
return (long) getConfSource().getStorageSize(
OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
incrementCounter(Statistic.INVOCATION_GET_FILE_STATUS, 1);
statistics.incrementReadOps(1);
LOG.trace("getFileStatus() path:{}", f);
Path qualifiedPath = f.makeQualified(uri, workingDir);
String key = pathToKey(qualifiedPath);
// Handle DistCp /NONE path
if (key.equals("NONE")) {
throw new FileNotFoundException("File not found. path /NONE.");
}
FileStatus fileStatus = null;
try {
fileStatus = convertFileStatus(
adapter.getFileStatus(key, uri, qualifiedPath, getUsername()));
} catch (OMException ex) {
if (ex.getResult().equals(OMException.ResultCodes.KEY_NOT_FOUND) ||
ex.getResult().equals(OMException.ResultCodes.BUCKET_NOT_FOUND) ||
ex.getResult().equals(OMException.ResultCodes.VOLUME_NOT_FOUND)) {
throw new FileNotFoundException("File not found. path:" + f);
}
}
return fileStatus;
}
@Override
public BlockLocation[] getFileBlockLocations(FileStatus fileStatus,
long start, long len)
throws IOException {
if (fileStatus instanceof LocatedFileStatus) {
return ((LocatedFileStatus) fileStatus).getBlockLocations();
} else {
return super.getFileBlockLocations(fileStatus, start, len);
}
}
@Override
public short getDefaultReplication() {
return adapter.getDefaultReplication();
}
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs,
Path dst) throws IOException {
incrementCounter(Statistic.INVOCATION_COPY_FROM_LOCAL_FILE);
super.copyFromLocalFile(delSrc, overwrite, srcs, dst);
}
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst) throws IOException {
incrementCounter(Statistic.INVOCATION_COPY_FROM_LOCAL_FILE);
super.copyFromLocalFile(delSrc, overwrite, src, dst);
}
@Override
public boolean exists(Path f) throws IOException {
incrementCounter(Statistic.INVOCATION_EXISTS);
return super.exists(f);
}
@Override
public FileChecksum getFileChecksum(Path f, long length) throws IOException {
incrementCounter(Statistic.INVOCATION_GET_FILE_CHECKSUM);
String key = pathToKey(f);
return adapter.getFileChecksum(key, length);
}
@Override
public FileStatus[] globStatus(Path pathPattern) throws IOException {
incrementCounter(Statistic.INVOCATION_GLOB_STATUS);
return super.globStatus(pathPattern);
}
@Override
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
throws IOException {
incrementCounter(Statistic.INVOCATION_GLOB_STATUS);
return super.globStatus(pathPattern, filter);
}
@Override
@SuppressWarnings("deprecation")
public boolean isDirectory(Path f) throws IOException {
incrementCounter(Statistic.INVOCATION_IS_DIRECTORY);
return super.isDirectory(f);
}
@Override
@SuppressWarnings("deprecation")
public boolean isFile(Path f) throws IOException {
incrementCounter(Statistic.INVOCATION_IS_FILE);
return super.isFile(f);
}
@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive)
throws IOException {
incrementCounter(Statistic.INVOCATION_LIST_FILES);
return super.listFiles(f, recursive);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException {
incrementCounter(Statistic.INVOCATION_LIST_LOCATED_STATUS);
return super.listLocatedStatus(f);
}
/**
* Turn a path (relative or otherwise) into an Ozone key.
*
* @param path the path of the file.
* @return the key of the object that represents the file.
*/
public String pathToKey(Path path) {
Objects.requireNonNull(path, "Path can't be null!");
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
String key = path.toUri().getPath();
if (!OzoneFSUtils.isValidName(key)) {
throw new InvalidPathException("Invalid path Name " + key);
}
// removing leading '/' char
key = key.substring(1);
LOG.trace("path for key: {} is: {}", key, path);
return key;
}
/**
* Add trailing delimiter to path if it is already not present.
*
* @param key the ozone Key which needs to be appended
* @return delimiter appended key
*/
private String addTrailingSlashIfNeeded(String key) {
if (!isEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
return key + OZONE_URI_DELIMITER;
} else {
return key;
}
}
@Override
public String toString() {
return "RootedOzoneFileSystem{URI=" + uri + ", "
+ "workingDir=" + workingDir + ", "
+ "userName=" + userName + ", "
+ "statistics=" + statistics
+ "}";
}
public ConfigurationSource getConfSource() {
Configuration conf = super.getConf();
ConfigurationSource source;
if (conf instanceof OzoneConfiguration) {
source = (ConfigurationSource) conf;
} else {
source = new LegacyHadoopConfigurationSource(conf);
}
return source;
}
/**
* This class provides an interface to iterate through all the keys in the
* bucket prefixed with the input path key and process them.
* <p>
* Each implementing class should define how the keys should be processed
* through the processKeyPath() function.
*/
private abstract class OzoneListingIterator {
private final Path path;
private final FileStatus status;
private String pathKey;
private Iterator<BasicKeyInfo> keyIterator = null;
private boolean isFSO;
OzoneListingIterator(Path path, boolean isFSO)
throws IOException {
this.path = path;
this.status = getFileStatus(path);
this.pathKey = pathToKey(path);
this.isFSO = isFSO;
if (!isFSO) {
if (status.isDirectory()) {
this.pathKey = addTrailingSlashIfNeeded(pathKey);
}
keyIterator = adapter.listKeys(pathKey);
}
}
OzoneListingIterator(Path path) throws IOException {
// non FSO implementation
this(path, false);
}
/**
* The output of processKey determines if further iteration through the
* keys should be done or not.
*
* @return true if we should continue iteration of keys, false otherwise.
* @throws IOException
*/
abstract boolean processKeyPath(List<String> keyPathList)
throws IOException;
/**
* Iterates through all the keys prefixed with the input path's key and
* processes the key though processKey().
* If for any key, the processKey() returns false, then the iteration is
* stopped and returned with false indicating that all the keys could not
* be processed successfully.
*
* If isFSO is true, call is from DeleteIteratorWithFSO and the list of keys
* will only contain immediate children i.e top level dirs and files
*
* @return true if all keys are processed successfully, false otherwise.
* @throws IOException
*/
boolean iterate() throws IOException {
LOG.trace("Iterating path: {}", path);
List<String> keyPathList = new ArrayList<>();
int batchSize = getConf().getInt(OZONE_FS_ITERATE_BATCH_SIZE,
OZONE_FS_ITERATE_BATCH_SIZE_DEFAULT);
if (status.isDirectory()) {
LOG.trace("Iterating directory: {}", pathKey);
OFSPath ofsPath = new OFSPath(pathKey);
String ofsPathPrefix =
ofsPath.getNonKeyPathNoPrefixDelim() + OZONE_URI_DELIMITER;
if (isFSO) {
FileStatus[] fileStatuses;
fileStatuses = listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
String keyName =
new OFSPath(fileStatus.getPath().toString()).getKeyName();
keyPathList.add(ofsPathPrefix + keyName);
}
if (keyPathList.size() >= batchSize) {
if (!processKeyPath(keyPathList)) {
return false;
} else {
keyPathList.clear();
}
}
} else {
while (keyIterator.hasNext()) {
BasicKeyInfo key = keyIterator.next();
// Convert key to full path before passing it to processKeyPath
// TODO: This conversion is redundant. But want to use only
// full path outside AdapterImpl. - Maybe a refactor later.
String keyPath = ofsPathPrefix + key.getName();
LOG.trace("iterating key path: {}", keyPath);
if (!key.getName().equals("")) {
keyPathList.add(keyPath);
}
if (keyPathList.size() >= batchSize) {
if (!processKeyPath(keyPathList)) {
return false;
} else {
keyPathList.clear();
}
}
}
}
if (keyPathList.size() > 0) {
if (!processKeyPath(keyPathList)) {
return false;
}
}
return true;
} else {
LOG.trace("iterating file: {}", path);
keyPathList.add(pathKey);
return processKeyPath(keyPathList);
}
}
String getPathKey() {
return pathKey;
}
boolean pathIsDirectory() {
return status.isDirectory();
}
FileStatus getStatus() {
return status;
}
}
public OzoneClientAdapter getAdapter() {
return adapter;
}
public boolean isEmpty(CharSequence cs) {
return cs == null || cs.length() == 0;
}
public boolean isNumber(String number) {
try {
Integer.parseInt(number);
} catch (NumberFormatException ex) {
return false;
}
return true;
}
FileStatus convertFileStatus(FileStatusAdapter fileStatusAdapter) {
Path symLink = null;
try {
fileStatusAdapter.getSymlink();
} catch (Exception ex) {
//NOOP: If not symlink symlink remains null.
}
FileStatus fileStatus = new FileStatus(
fileStatusAdapter.getLength(),
fileStatusAdapter.isDir(),
fileStatusAdapter.getBlockReplication(),
fileStatusAdapter.getBlocksize(),
fileStatusAdapter.getModificationTime(),
fileStatusAdapter.getAccessTime(),
new FsPermission(fileStatusAdapter.getPermission()),
fileStatusAdapter.getOwner(),
fileStatusAdapter.getGroup(),
symLink,
fileStatusAdapter.getPath()
);
BlockLocation[] blockLocations = fileStatusAdapter.getBlockLocations();
if (blockLocations == null || blockLocations.length == 0) {
return fileStatus;
}
return new LocatedFileStatus(fileStatus, blockLocations);
}
}