blob: 98b7e9ac52cc0c95f11f016ca345359594c99e12 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* The NNStorageRetentionManager is responsible for inspecting the storage
* directories of the NN and enforcing a retention policy on checkpoints
* and edit logs.
*
* It delegates the actual removal of files to a StoragePurger
* implementation, which might delete the files or instead copy them to
* a filer or HDFS for later analysis.
*/
public class NNStorageRetentionManager {
private final int numCheckpointsToRetain;
private final long numExtraEditsToRetain;
private final int maxExtraEditsSegmentsToRetain;
private static final Log LOG = LogFactory.getLog(
NNStorageRetentionManager.class);
private final NNStorage storage;
private final StoragePurger purger;
private final LogsPurgeable purgeableLogs;
public NNStorageRetentionManager(
Configuration conf,
NNStorage storage,
LogsPurgeable purgeableLogs,
StoragePurger purger) {
this.numCheckpointsToRetain = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY,
DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT);
this.numExtraEditsToRetain = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT);
this.maxExtraEditsSegmentsToRetain = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT);
Preconditions.checkArgument(numCheckpointsToRetain > 0,
"Must retain at least one checkpoint");
Preconditions.checkArgument(numExtraEditsToRetain >= 0,
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY +
" must not be negative");
this.storage = storage;
this.purgeableLogs = purgeableLogs;
this.purger = purger;
}
public NNStorageRetentionManager(Configuration conf, NNStorage storage,
LogsPurgeable purgeableLogs) {
this(conf, storage, purgeableLogs, new DeletionStoragePurger());
}
void purgeCheckpoints(NameNodeFile nnf) throws IOException {
purgeCheckpoinsAfter(nnf, -1);
}
void purgeCheckpoinsAfter(NameNodeFile nnf, long fromTxId)
throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector(EnumSet.of(nnf));
storage.inspectStorageDirs(inspector);
for (FSImageFile image : inspector.getFoundImages()) {
if (image.getCheckpointTxId() > fromTxId) {
purger.purgeImage(image);
}
}
}
void purgeOldStorage(NameNodeFile nnf) throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector(EnumSet.of(nnf));
storage.inspectStorageDirs(inspector);
long minImageTxId = getImageTxIdToRetain(inspector);
purgeCheckpointsOlderThan(inspector, minImageTxId);
if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
// do not purge edits for IMAGE_ROLLBACK.
return;
}
// If fsimage_N is the image we want to keep, then we need to keep
// all txns > N. We can remove anything < N+1, since fsimage_N
// reflects the state up to and including N. However, we also
// provide a "cushion" of older txns that we keep, which is
// handy for HA, where a remote node may not have as many
// new images.
//
// First, determine the target number of extra transactions to retain based
// on the configured amount.
long minimumRequiredTxId = minImageTxId + 1;
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false);
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {
return ComparisonChain.start()
.compare(a.getFirstTxId(), b.getFirstTxId())
.compare(a.getLastTxId(), b.getLastTxId())
.result();
}
});
// Remove from consideration any edit logs that are in fact required.
while (editLogs.size() > 0 &&
editLogs.get(editLogs.size() - 1).getFirstTxId() >= minimumRequiredTxId) {
editLogs.remove(editLogs.size() - 1);
}
// Next, adjust the number of transactions to retain if doing so would mean
// keeping too many segments around.
while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
purgeLogsFrom = editLogs.get(0).getLastTxId() + 1;
editLogs.remove(0);
}
// Finally, ensure that we're not trying to purge any transactions that we
// actually need.
if (purgeLogsFrom > minimumRequiredTxId) {
throw new AssertionError("Should not purge more edits than required to "
+ "restore: " + purgeLogsFrom + " should be <= "
+ minimumRequiredTxId);
}
purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
}
private void purgeCheckpointsOlderThan(
FSImageTransactionalStorageInspector inspector,
long minTxId) {
for (FSImageFile image : inspector.getFoundImages()) {
if (image.getCheckpointTxId() < minTxId) {
purger.purgeImage(image);
}
}
}
/**
* @param inspector inspector that has already inspected all storage dirs
* @return the transaction ID corresponding to the oldest checkpoint
* that should be retained.
*/
private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector) {
List<FSImageFile> images = inspector.getFoundImages();
TreeSet<Long> imageTxIds = Sets.newTreeSet();
for (FSImageFile image : images) {
imageTxIds.add(image.getCheckpointTxId());
}
List<Long> imageTxIdsList = Lists.newArrayList(imageTxIds);
if (imageTxIdsList.isEmpty()) {
return 0;
}
Collections.reverse(imageTxIdsList);
int toRetain = Math.min(numCheckpointsToRetain, imageTxIdsList.size());
long minTxId = imageTxIdsList.get(toRetain - 1);
LOG.info("Going to retain " + toRetain + " images with txid >= " +
minTxId);
return minTxId;
}
/**
* Interface responsible for disposing of old checkpoints and edit logs.
*/
static interface StoragePurger {
void purgeLog(EditLogFile log);
void purgeImage(FSImageFile image);
}
static class DeletionStoragePurger implements StoragePurger {
@Override
public void purgeLog(EditLogFile log) {
LOG.info("Purging old edit log " + log);
deleteOrWarn(log.getFile());
}
@Override
public void purgeImage(FSImageFile image) {
LOG.info("Purging old image " + image);
deleteOrWarn(image.getFile());
deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile()));
}
private static void deleteOrWarn(File file) {
if (!file.delete()) {
// It's OK if we fail to delete something -- we'll catch it
// next time we swing through this directory.
LOG.warn("Could not delete " + file);
}
}
}
/**
* Delete old OIV fsimages. Since the target dir is not a full blown
* storage directory, we simply list and keep the latest ones. For the
* same reason, no storage inspector is used.
*/
void purgeOldLegacyOIVImages(String dir, long txid) {
File oivImageDir = new File(dir);
final String oivImagePrefix = NameNodeFile.IMAGE_LEGACY_OIV.getName();
String filesInStorage[];
// Get the listing
filesInStorage = oivImageDir.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.matches(oivImagePrefix + "_(\\d+)");
}
});
// Check whether there is any work to do.
if (filesInStorage.length <= numCheckpointsToRetain) {
return;
}
// Create a sorted list of txids from the file names.
TreeSet<Long> sortedTxIds = new TreeSet<Long>();
for (String fName : filesInStorage) {
// Extract the transaction id from the file name.
long fTxId;
try {
fTxId = Long.parseLong(fName.substring(oivImagePrefix.length() + 1));
} catch (NumberFormatException nfe) {
// This should not happen since we have already filtered it.
// Log and continue.
LOG.warn("Invalid file name. Skipping " + fName);
continue;
}
sortedTxIds.add(Long.valueOf(fTxId));
}
int numFilesToDelete = sortedTxIds.size() - numCheckpointsToRetain;
Iterator<Long> iter = sortedTxIds.iterator();
while (numFilesToDelete > 0 && iter.hasNext()) {
long txIdVal = iter.next().longValue();
String fileName = NNStorage.getLegacyOIVImageFileName(txIdVal);
LOG.info("Deleting " + fileName);
File fileToDelete = new File(oivImageDir, fileName);
if (!fileToDelete.delete()) {
// deletion failed.
LOG.warn("Failed to delete image file: " + fileToDelete);
}
numFilesToDelete--;
}
}
}