blob: 5a93a6d0c4da7b60151fde0d066b2ff9e74c0f08 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.master.cleaner;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.ipc.RemoteException;
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
private final FileSystem fs;
private final Path oldFileDir;
private final Configuration conf;
protected List<T> cleanersChain;
* @param name name of the chore being run
* @param sleepPeriod the period of time to sleep between each run
* @param s the stopper
* @param conf configuration to use
* @param fs handle to the FS
* @param oldFileDir the path to the archived files
* @param confKey configuration key for the classes to instantiate
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) {
super(name, s, sleepPeriod);
this.fs = fs;
this.oldFileDir = oldFileDir;
this.conf = conf;
* Validate the file to see if it even belongs in the directory. If it is valid, then the file
* will go through the cleaner delegates, but otherwise the file is just deleted.
* @param file full {@link Path} of the file to be checked
* @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
protected abstract boolean validate(Path file);
* Instantiate and initialize all the file cleaners set in the configuration
* @param confKey key to get the file cleaner classes from the configuration
private void initCleanerChain(String confKey) {
this.cleanersChain = new LinkedList<T>();
String[] logCleaners = conf.getStrings(confKey);
if (logCleaners != null) {
for (String className : logCleaners) {
T logCleaner = newFileCleaner(className, conf);
if (logCleaner != null) {
LOG.debug("initialize cleaner=" + className);
* A utility method to create new instances of LogCleanerDelegate based on the class name of the
* LogCleanerDelegate.
* @param className fully qualified class name of the LogCleanerDelegate
* @param conf
* @return the new instance
private T newFileCleaner(String className, Configuration conf) {
try {
Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
T cleaner = (T) c.newInstance();
return cleaner;
} catch (Exception e) {
LOG.warn("Can NOT create CleanerDelegate: " + className, e);
// skipping if can't instantiate
return null;
protected void chore() {
try {
FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
} catch (IOException e) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
LOG.warn("Error while cleaning the logs", e);
* Loop over the given directory entries, and check whether they can be deleted.
* If an entry is itself a directory it will be recursively checked and deleted itself iff
* all subentries are deleted (and no new subentries are added in the mean time)
* @param entries directory entries to check
* @return true if all entries were successfully deleted
private boolean checkAndDeleteEntries(FileStatus[] entries) {
if (entries == null) {
return true;
boolean allEntriesDeleted = true;
List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
for (FileStatus child : entries) {
Path path = child.getPath();
if (child.isDirectory()) {
// for each subdirectory delete it and all entries if possible
if (!checkAndDeleteDirectory(path)) {
allEntriesDeleted = false;
} else {
// collect all files to attempt to delete in one batch
if (!checkAndDeleteFiles(files)) {
allEntriesDeleted = false;
return allEntriesDeleted;
* Attempt to delete a directory and all files under that directory. Each child file is passed
* through the delegates to see if it can be deleted. If the directory has no children when the
* cleaners have finished it is deleted.
* <p>
* If new children files are added between checks of the directory, the directory will <b>not</b>
* be deleted.
* @param dir directory to check
* @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise.
@VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
if (LOG.isTraceEnabled()) {
LOG.trace("Checking directory: " + dir);
try {
FileStatus[] children = FSUtils.listStatus(fs, dir);
boolean allChildrenDeleted = checkAndDeleteEntries(children);
// if the directory still has children, we can't delete it, so we are done
if (!allChildrenDeleted) return false;
} catch (IOException e) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
LOG.warn("Error while listing directory: " + dir, e);
// couldn't list directory, so don't try to delete, and don't return success
return false;
// otherwise, all the children (that we know about) have been deleted, so we should try to
// delete this directory. However, don't do so recursively so we don't delete files that have
// been added since we last checked.
try {
return fs.delete(dir, false);
} catch (IOException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Couldn't delete directory: " + dir, e);
// couldn't delete w/o exception, so we can't return success.
return false;
* Run the given files through each of the cleaners to see if it should be deleted, deleting it if
* necessary.
* @param files List of FileStatus for the files to check (and possibly delete)
* @return true iff successfully deleted all files
private boolean checkAndDeleteFiles(List<FileStatus> files) {
// first check to see if the path is valid
List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
List<FileStatus> invalidFiles = Lists.newArrayList();
for (FileStatus file : files) {
if (validate(file.getPath())) {
} else {
LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
Iterable<FileStatus> deletableValidFiles = validFiles;
// check each of the cleaners for the valid files
for (T cleaner : cleanersChain) {
if (cleaner.isStopped() || this.getStopper().isStopped()) {
LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
+ this.oldFileDir);
return false;
Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
// trace which cleaner is holding on to each file
if (LOG.isTraceEnabled()) {
ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
for (FileStatus file : deletableValidFiles) {
if (!filteredFileSet.contains(file)) {
LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
deletableValidFiles = filteredFiles;
Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
int deletedFileCount = 0;
for (FileStatus file : filesToDelete) {
Path filePath = file.getPath();
if (LOG.isDebugEnabled()) {
LOG.debug("Removing: " + filePath + " from archive");
try {
boolean success = this.fs.delete(filePath, false);
if (success) {
} else {
LOG.warn("Attempted to delete:" + filePath
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
} catch (IOException e) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
LOG.warn("Error while deleting: " + filePath, e);
return deletedFileCount == files.size();
public void cleanup() {
for (T lc : this.cleanersChain) {
try {
} catch (Throwable t) {
LOG.warn("Stopping", t);