blob: a4c292a3a6119e527e318eace35afbcb10c3a637 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
/**
* This is a background service to delete orphan directories and its
* sub paths(sub-dirs and sub-files).
*
* <p>
* This will scan the metadata of om periodically to get the orphan dirs from
* DeletedDirectoryTable and find its sub paths. It will fetch all sub-files
* from FileTable and move those to DeletedTable so that OM's
* KeyDeletingService will cleanup those files later. It will fetch all
* sub-directories from the DirectoryTable and move those to
* DeletedDirectoryTable so that these will be visited in next iterations.
*
* <p>
* After moving all sub-files and sub-dirs the parent orphan directory will be
* deleted by this service. It will continue traversing until all the leaf path
* components of an orphan directory is visited.
*/
public class DirectoryDeletingService extends BackgroundService {
private final OzoneManager ozoneManager;
private AtomicLong deletedDirsCount;
private AtomicLong deletedFilesCount;
private final AtomicLong runCount;
private static ClientId clientId = ClientId.randomId();
// Use only a single thread for DirDeletion. Multiple threads would read
// or write to same tables and can send deletion requests for same key
// multiple times.
private static final int DIR_DELETING_CORE_POOL_SIZE = 1;
// Number of items(dirs/files) to be batched in an iteration.
private final long pathLimitPerTask;
public DirectoryDeletingService(long interval, TimeUnit unit,
long serviceTimeout, OzoneManager ozoneManager,
OzoneConfiguration configuration) {
super("DirectoryDeletingService", interval, unit,
DIR_DELETING_CORE_POOL_SIZE, serviceTimeout);
this.ozoneManager = ozoneManager;
this.deletedDirsCount = new AtomicLong(0);
this.deletedFilesCount = new AtomicLong(0);
this.runCount = new AtomicLong(0);
this.pathLimitPerTask = configuration
.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
}
private boolean shouldRun() {
if (ozoneManager == null) {
// OzoneManager can be null for testing
return true;
}
return ozoneManager.isLeaderReady();
}
private boolean isRatisEnabled() {
if (ozoneManager == null) {
return false;
}
return ozoneManager.isRatisEnabled();
}
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
queue.add(new DirectoryDeletingService.DirDeletingTask());
return queue;
}
private class DirDeletingTask implements BackgroundTask {
@Override
public int getPriority() {
return 0;
}
@Override
public BackgroundTaskResult call() throws Exception {
if (shouldRun()) {
runCount.incrementAndGet();
long count = pathLimitPerTask;
try {
long startTime = Time.monotonicNow();
// step-1) Get one pending deleted directory
Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo =
ozoneManager.getKeyManager().getPendingDeletionDir();
if (pendingDeletedDirInfo != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Pending deleted dir name: {}",
pendingDeletedDirInfo.getValue().getKeyName());
}
final String[] keys = pendingDeletedDirInfo.getKey()
.split(OM_KEY_PREFIX);
final long volumeId = Long.parseLong(keys[1]);
final long bucketId = Long.parseLong(keys[2]);
// step-1: get all sub directories under the deletedDir
List<OmKeyInfo> dirs = ozoneManager.getKeyManager()
.getPendingDeletionSubDirs(volumeId, bucketId,
pendingDeletedDirInfo.getValue(), count);
count = count - dirs.size();
List<OmKeyInfo> deletedSubDirList = new ArrayList<>();
for (OmKeyInfo dirInfo : dirs) {
deletedSubDirList.add(dirInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("deleted sub dir name: {}",
dirInfo.getKeyName());
}
}
// step-2: get all sub files under the deletedDir
List<OmKeyInfo> purgeDeletedFiles = ozoneManager.getKeyManager()
.getPendingDeletionSubFiles(volumeId, bucketId,
pendingDeletedDirInfo.getValue(), count);
count = count - purgeDeletedFiles.size();
if (LOG.isDebugEnabled()) {
for (OmKeyInfo fileInfo : purgeDeletedFiles) {
LOG.debug("deleted sub file name: {}", fileInfo.getKeyName());
}
}
// step-3: Since there is a boundary condition of 'numEntries' in
// each batch, check whether the sub paths count reached batch size
// limit. If count reached limit then there can be some more child
// paths to be visited and will keep the parent deleted directory
// for one more pass.
final Optional<String> purgeDeletedDir = count > 0 ?
Optional.of(pendingDeletedDirInfo.getKey()) :
Optional.empty();
if (isRatisEnabled()) {
submitPurgePaths(volumeId, bucketId, purgeDeletedDir,
purgeDeletedFiles, deletedSubDirList);
}
// TODO: need to handle delete with non-ratis
deletedDirsCount.incrementAndGet();
deletedFilesCount.addAndGet(purgeDeletedFiles.size());
if (LOG.isDebugEnabled()) {
LOG.debug("Number of dirs deleted: {}, Number of files moved:" +
" {} to DeletedTable, elapsed time: {}ms",
deletedDirsCount, deletedFilesCount,
Time.monotonicNow() - startTime);
}
}
} catch (IOException e) {
LOG.error("Error while running delete directories and files " +
"background task. Will retry at next run.", e);
}
}
// place holder by returning empty results of this call back.
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
}
/**
* Returns the number of dirs deleted by the background service.
*
* @return Long count.
*/
@VisibleForTesting
public long getDeletedDirsCount() {
return deletedDirsCount.get();
}
/**
* Returns the number of files moved to DeletedTable by the background
* service.
*
* @return Long count.
*/
@VisibleForTesting
public long getMovedFilesCount() {
return deletedFilesCount.get();
}
/**
* Returns the number of times this Background service has run.
*
* @return Long, run count.
*/
@VisibleForTesting
public long getRunCount() {
return runCount.get();
}
private int submitPurgePaths(final long volumeId, final long bucketId,
final Optional<String> purgeDeletedDir,
final List<OmKeyInfo> purgeDeletedFiles,
final List<OmKeyInfo> markDirsAsDeleted) {
// Put all keys to be purged in a list
int deletedCount = 0;
OzoneManagerProtocolProtos.PurgePathRequest.Builder purgePathsRequest =
OzoneManagerProtocolProtos.PurgePathRequest.newBuilder();
purgePathsRequest.setVolumeId(volumeId);
purgePathsRequest.setBucketId(bucketId);
purgeDeletedDir.ifPresent(purgePathsRequest::setDeletedDir);
for (OmKeyInfo purgeFile : purgeDeletedFiles) {
purgePathsRequest.addDeletedSubFiles(
purgeFile.getProtobuf(true, ClientVersion.CURRENT_VERSION));
}
// Add these directories to deletedDirTable, so that its sub-paths will be
// traversed in next iteration to ensure cleanup all sub-children.
for (OmKeyInfo dir : markDirsAsDeleted) {
purgePathsRequest.addMarkDeletedSubDirs(
dir.getProtobuf(ClientVersion.CURRENT_VERSION));
}
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
purgeDirRequest.addDeletedPath(purgePathsRequest.build());
OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
.setPurgeDirectoriesRequest(purgeDirRequest)
.setClientId(clientId.toString())
.build();
// Submit Purge paths request to OM
try {
RaftClientRequest raftClientRequest =
createRaftClientRequestForDelete(omRequest);
ozoneManager.getOmRatisServer().submitRequest(omRequest,
raftClientRequest);
} catch (ServiceException e) {
LOG.error("PurgePaths request failed. Will retry at next run.");
return 0;
}
return deletedCount;
}
private RaftClientRequest createRaftClientRequestForDelete(
OzoneManagerProtocolProtos.OMRequest omRequest) {
return RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
.setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
.setCallId(runCount.get())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
}
}