blob: 6d7a88a4d867754b03ce069ec1d447b7bf8cfe5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;
import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OFSPath;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.om.helpers.OzoneFSUtils.addTrailingSlashIfNeeded;
import static org.apache.hadoop.ozone.om.helpers.OzoneFSUtils.pathToKey;
/**
* FileSystem to be used by the Trash Emptier.
* Only the apis used by the trash emptier are implemented.
*/
public class TrashOzoneFileSystem extends FileSystem {
private static final RpcController NULL_RPC_CONTROLLER = null;
private static final int OZONE_FS_ITERATE_BATCH_SIZE = 100;
private final OzoneManager ozoneManager;
private final String userName;
private final AtomicLong runCount;
private static final ClientId CLIENT_ID = ClientId.randomId();
private static final Logger LOG =
LoggerFactory.getLogger(TrashOzoneFileSystem.class);
public TrashOzoneFileSystem(OzoneManager ozoneManager) throws IOException {
this.ozoneManager = ozoneManager;
this.userName =
UserGroupInformation.getCurrentUser().getShortUserName();
this.runCount = new AtomicLong(0);
}
private RaftClientRequest getRatisRequest(
OzoneManagerProtocolProtos.OMRequest omRequest) {
return RaftClientRequest.newBuilder()
.setClientId(CLIENT_ID)
.setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
.setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.getAndIncrement())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
}
private void submitRequest(OzoneManagerProtocolProtos.OMRequest omRequest)
throws Exception {
ozoneManager.getMetrics().incNumTrashWriteRequests();
if (ozoneManager.isRatisEnabled()) {
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest);
omRequest = omClientRequest.preExecute(ozoneManager);
RaftClientRequest req = getRatisRequest(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest, req);
} else {
ozoneManager.getOmServerProtocol().
submitRequest(NULL_RPC_CONTROLLER, omRequest);
}
}
@Override
public URI getUri() {
throw new UnsupportedOperationException(
"fs.getUri() not implemented in TrashOzoneFileSystem");
}
@Override
public FSDataInputStream open(Path path, int i) {
throw new UnsupportedOperationException(
"fs.open() not implemented in TrashOzoneFileSystem");
}
@Override
public FSDataOutputStream create(Path path,
FsPermission fsPermission,
boolean b, int i, short i1,
long l, Progressable progressable){
throw new UnsupportedOperationException(
"fs.create() not implemented in TrashOzoneFileSystem");
}
@Override
public FSDataOutputStream append(Path path, int i,
Progressable progressable) {
throw new UnsupportedOperationException(
"fs.append() not implemented in TrashOzoneFileSystem");
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
ozoneManager.getMetrics().incNumTrashRenames();
LOG.trace("Src:" + src + "Dst:" + dst);
// check whether the src and dst belong to the same bucket & trashroot.
OFSPath srcPath = new OFSPath(src);
OFSPath dstPath = new OFSPath(dst);
Preconditions.checkArgument(srcPath.getBucketName().
equals(dstPath.getBucketName()));
Preconditions.checkArgument(srcPath.getTrashRoot().
toString().equals(dstPath.getTrashRoot().toString()));
RenameIterator iterator = new RenameIterator(src, dst);
iterator.iterate();
return true;
}
@Override
public boolean delete(Path path, boolean b) throws IOException {
ozoneManager.getMetrics().incNumTrashDeletes();
DeleteIterator iterator = new DeleteIterator(path, true);
iterator.iterate();
return true;
}
@Override
public FileStatus[] listStatus(Path path) throws IOException {
ozoneManager.getMetrics().incNumTrashListStatus();
List<FileStatus> fileStatuses = new ArrayList<>();
OmKeyArgs keyArgs = constructOmKeyArgs(path);
List<OzoneFileStatus> list = ozoneManager.
listStatus(keyArgs, false, null, Integer.MAX_VALUE);
for (OzoneFileStatus status : list) {
FileStatus fileStatus = convertToFileStatus(status);
fileStatuses.add(fileStatus);
}
return fileStatuses.toArray(new FileStatus[0]);
}
/**
* converts OzoneFileStatus object to FileStatus.
*/
private FileStatus convertToFileStatus(OzoneFileStatus status) {
Path temp = new Path(OZONE_URI_DELIMITER +
status.getKeyInfo().getVolumeName() +
OZONE_URI_DELIMITER +
status.getKeyInfo().getBucketName() +
OZONE_URI_DELIMITER +
status.getKeyInfo().getKeyName());
return new FileStatus(
status.getKeyInfo().getDataSize(),
status.isDirectory(),
status.getKeyInfo().getFactor().getNumber(),
status.getBlockSize(),
status.getKeyInfo().getModificationTime(),
temp
);
}
@Override
public void setWorkingDirectory(Path path) {
throw new UnsupportedOperationException(
"fs.setWorkingDirectory() not implemented in TrashOzoneFileSystem");
}
@Override
public Path getWorkingDirectory() {
throw new UnsupportedOperationException(
"fs.getWorkingDirectory() not implemented in TrashOzoneFileSystem");
}
@Override
public boolean mkdirs(Path path,
FsPermission fsPermission) {
throw new UnsupportedOperationException(
"fs.mkdirs() not implemented in TrashOzoneFileSystem");
}
@Override
public FileStatus getFileStatus(Path path) throws IOException {
ozoneManager.getMetrics().incNumGetFileStatus();
OmKeyArgs keyArgs = constructOmKeyArgs(path);
OzoneFileStatus ofs = ozoneManager.getKeyManager().getFileStatus(keyArgs);
FileStatus fileStatus = convertToFileStatus(ofs);
return fileStatus;
}
private OmKeyArgs constructOmKeyArgs(Path path) {
OFSPath ofsPath = new OFSPath(path);
String volume = ofsPath.getVolumeName();
String bucket = ofsPath.getBucketName();
String key = ofsPath.getKeyName();
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setVolumeName(volume)
.setBucketName(bucket)
.setKeyName(key)
.build();
return keyArgs;
}
@Override
public Collection<FileStatus> getTrashRoots(boolean allUsers) {
Preconditions.checkArgument(allUsers);
ozoneManager.getMetrics().incNumTrashGetTrashRoots();
Iterator<Map.Entry<CacheKey<String>,
CacheValue<OmBucketInfo>>> bucketIterator =
ozoneManager.getMetadataManager().getBucketIterator();
List<FileStatus> ret = new ArrayList<>();
while (bucketIterator.hasNext()){
Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>> entry =
bucketIterator.next();
OmBucketInfo omBucketInfo = entry.getValue().getCacheValue();
Path volumePath = new Path(OZONE_URI_DELIMITER,
omBucketInfo.getVolumeName());
Path bucketPath = new Path(volumePath, omBucketInfo.getBucketName());
Path trashRoot = new Path(bucketPath, FileSystem.TRASH_PREFIX);
try {
if (exists(trashRoot)) {
FileStatus[] list = this.listStatus(trashRoot);
for (FileStatus candidate : list) {
if (exists(candidate.getPath()) && candidate.isDirectory()) {
ret.add(candidate);
}
}
}
} catch (Exception e){
LOG.error("Couldn't perform fs operation " +
"fs.listStatus()/fs.exists()", e);
}
}
return ret;
}
@Override
public boolean exists(Path f) throws IOException {
ozoneManager.getMetrics().incNumTrashExists();
try {
this.getFileStatus(f);
return true;
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
LOG.trace("Couldn't execute getFileStatus()", e);
return false;
} else {
throw e;
}
}
}
private abstract class OzoneListingIterator {
private final Path path;
private final FileStatus status;
private String pathKey;
private TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIterator;
OzoneListingIterator(Path path)
throws IOException {
this.path = path;
this.status = getFileStatus(path);
this.pathKey = pathToKey(path);
if (status.isDirectory()) {
this.pathKey = addTrailingSlashIfNeeded(pathKey);
}
keyIterator = ozoneManager.getMetadataManager().getKeyIterator();
}
/**
* The output of processKey determines if further iteration through the
* keys should be done or not.
*
* @return true if we should continue iteration of keys, false otherwise.
* @throws IOException
*/
abstract boolean processKeyPath(List<String> keyPathList)
throws IOException;
/**
* Iterates through all the keys prefixed with the input path's key and
* processes the key though processKey().
* If for any key, the processKey() returns false, then the iteration is
* stopped and returned with false indicating that all the keys could not
* be processed successfully.
*
* @return true if all keys are processed successfully, false otherwise.
* @throws IOException
*/
boolean iterate() throws IOException {
LOG.trace("Iterating path: {}", path);
List<String> keyPathList = new ArrayList<>();
if (status.isDirectory()) {
LOG.trace("Iterating directory: {}", pathKey);
OFSPath ofsPath = new OFSPath(pathKey);
String ofsPathprefix =
ofsPath.getNonKeyPathNoPrefixDelim() + OZONE_URI_DELIMITER;
while (keyIterator.hasNext()) {
Table.KeyValue< String, OmKeyInfo > kv = keyIterator.next();
String keyPath = ofsPathprefix + kv.getValue().getKeyName();
LOG.trace("iterating key path: {}", keyPath);
if (!kv.getValue().getKeyName().equals("")
&& kv.getKey().startsWith("/" + pathKey)) {
keyPathList.add(keyPath);
}
if (keyPathList.size() >= OZONE_FS_ITERATE_BATCH_SIZE) {
if (!processKeyPath(keyPathList)) {
return false;
} else {
keyPathList.clear();
}
}
}
if (keyPathList.size() > 0) {
if (!processKeyPath(keyPathList)) {
return false;
}
}
return true;
} else {
LOG.trace("iterating file: {}", path);
keyPathList.add(pathKey);
return processKeyPath(keyPathList);
}
}
FileStatus getStatus() {
return status;
}
}
private class RenameIterator extends OzoneListingIterator {
private final String srcPath;
private final String dstPath;
RenameIterator(Path srcPath, Path dstPath)
throws IOException {
super(srcPath);
this.srcPath = pathToKey(srcPath);
this.dstPath = pathToKey(dstPath);
LOG.trace("rename from:{} to:{}", this.srcPath, this.dstPath);
}
@Override
boolean processKeyPath(List<String> keyPathList) {
for (String keyPath : keyPathList) {
String newPath = dstPath.concat(keyPath.substring(srcPath.length()));
OFSPath src = new OFSPath(keyPath);
OFSPath dst = new OFSPath(newPath);
OzoneManagerProtocolProtos.OMRequest omRequest =
getRenameKeyRequest(src, dst);
try {
ozoneManager.getMetrics().incNumTrashFilesRenames();
submitRequest(omRequest);
} catch (Throwable e) {
LOG.error("Couldn't send rename request.", e);
}
}
return true;
}
private OzoneManagerProtocolProtos.OMRequest
getRenameKeyRequest(
OFSPath src, OFSPath dst) {
String volumeName = src.getVolumeName();
String bucketName = src.getBucketName();
String keyName = src.getKeyName();
OzoneManagerProtocolProtos.KeyArgs keyArgs =
OzoneManagerProtocolProtos.KeyArgs.newBuilder()
.setKeyName(keyName)
.setVolumeName(volumeName)
.setBucketName(bucketName)
.build();
String toKeyName = dst.getKeyName();
OzoneManagerProtocolProtos.RenameKeyRequest renameKeyRequest =
OzoneManagerProtocolProtos.RenameKeyRequest.newBuilder()
.setKeyArgs(keyArgs)
.setToKeyName(toKeyName)
.build();
OzoneManagerProtocolProtos.OMRequest omRequest =
null;
try {
omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setClientId(CLIENT_ID.toString())
.setUserInfo(getUserInfo())
.setRenameKeyRequest(renameKeyRequest)
.setCmdType(OzoneManagerProtocolProtos.Type.RenameKey)
.build();
} catch (IOException e) {
LOG.error("Couldn't get userinfo", e);
}
return omRequest;
}
}
private class DeleteIterator extends OzoneListingIterator {
private final boolean recursive;
private List<String> keysList;
DeleteIterator(Path f, boolean recursive)
throws IOException {
super(f);
this.recursive = recursive;
keysList = new ArrayList<>();
if (getStatus().isDirectory()
&& !this.recursive
&& listStatus(f).length != 0) {
throw new PathIsNotEmptyDirectoryException(f.toString());
}
}
@Override
boolean processKeyPath(List<String> keyPathList) {
LOG.trace("Deleting keys: {}", keyPathList);
for (String keyPath : keyPathList) {
OFSPath path = new OFSPath(keyPath);
OzoneManagerProtocolProtos.OMRequest omRequest =
getDeleteKeyRequest(path);
try {
ozoneManager.getMetrics().incNumTrashFilesDeletes();
submitRequest(omRequest);
} catch (Throwable e) {
LOG.error("Couldn't send rename request.", e);
}
}
return true;
}
private OzoneManagerProtocolProtos.OMRequest
getDeleteKeyRequest(
OFSPath keyPath) {
String volumeName = keyPath.getVolumeName();
String bucketName = keyPath.getBucketName();
String keyName = keyPath.getKeyName();
keysList.clear();
// Keys List will have only 1 entry.
keysList.add(keyName);
OzoneManagerProtocolProtos.DeleteKeyArgs.Builder deleteKeyArgs =
OzoneManagerProtocolProtos.DeleteKeyArgs.newBuilder()
.setBucketName(bucketName)
.setVolumeName(volumeName);
deleteKeyArgs.addAllKeys(keysList);
OzoneManagerProtocolProtos.DeleteKeysRequest deleteKeysRequest =
OzoneManagerProtocolProtos.DeleteKeysRequest.newBuilder()
.setDeleteKeys(deleteKeyArgs)
.build();
OzoneManagerProtocolProtos.OMRequest omRequest =
null;
try {
omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setClientId(CLIENT_ID.toString())
.setUserInfo(getUserInfo())
.setDeleteKeysRequest(deleteKeysRequest)
.setCmdType(OzoneManagerProtocolProtos.Type.DeleteKeys)
.build();
} catch (IOException e) {
LOG.error("Couldn't get userinfo", e);
}
return omRequest;
}
}
OzoneManagerProtocolProtos.UserInfo getUserInfo() throws IOException {
UserGroupInformation user = UserGroupInformation.getCurrentUser();
InetAddress remoteAddress = ozoneManager.getOmRpcServerAddr().getAddress();
OzoneManagerProtocolProtos.UserInfo.Builder userInfo =
OzoneManagerProtocolProtos.UserInfo.newBuilder();
if (user != null) {
userInfo.setUserName(user.getUserName());
}
if (remoteAddress != null) {
userInfo.setHostName(remoteAddress.getHostName());
userInfo.setRemoteAddress(remoteAddress.getHostAddress());
}
return userInfo.build();
}
}