blob: f65713ebf26305072ba3cef64789e6bef8d00c10 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
* logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise.
* @see BaseLogCleanerDelegate
*/
@InterfaceAudience.Private
public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);
public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC =
"hbase.oldwals.cleaner.thread.timeout.msec";
static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;
private final LinkedBlockingQueue<CleanerContext> pendingDelete;
private List<Thread> oldWALsCleaner;
private long cleanerThreadTimeoutMsec;
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
* @param conf configuration to use
* @param fs handle to the FS
* @param oldLogDir the path to the archived logs
* @param pool the thread pool used to scan directories
*/
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir, DirScanPool pool) {
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
pool);
this.pendingDelete = new LinkedBlockingQueue<>();
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
this.oldWALsCleaner = createOldWalsCleaner(size);
this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
}
@Override
protected boolean validate(Path file) {
return AbstractFSWALProvider.validateWALFilename(file.getName()) ||
MasterProcedureUtil.validateProcedureWALFilename(file.getName()) ||
file.getName().endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX);
}
@Override
public void onConfigurationChange(Configuration conf) {
int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
if (newSize == oldWALsCleaner.size()) {
LOG.debug("Size from configuration is the same as previous which "
+ "is {}, no need to update.", newSize);
return;
}
interruptOldWALsCleaner();
oldWALsCleaner = createOldWalsCleaner(newSize);
cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
}
@Override
protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
List<CleanerContext> results = new ArrayList<>();
for (FileStatus file : filesToDelete) {
LOG.trace("Scheduling file {} for deletion", file);
if (file != null) {
results.add(new CleanerContext(file));
}
}
if (results.isEmpty()) {
return 0;
}
LOG.debug("Old WALs for delete: {}",
results.stream().map(cc -> cc.target.getPath().getName()).
collect(Collectors.joining(", ")));
pendingDelete.addAll(results);
int deletedFiles = 0;
for (CleanerContext res : results) {
LOG.trace("Awaiting the results for deletion of old WAL file: {}", res);
deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0;
}
return deletedFiles;
}
@Override
public synchronized void cleanup() {
super.cleanup();
interruptOldWALsCleaner();
}
int getSizeOfCleaners() {
return oldWALsCleaner.size();
}
long getCleanerThreadTimeoutMsec() {
return cleanerThreadTimeoutMsec;
}
private List<Thread> createOldWalsCleaner(int size) {
LOG.info("Creating {} old WALs cleaner threads", size);
List<Thread> oldWALsCleaner = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Thread cleaner = new Thread(() -> deleteFile());
cleaner.setName("OldWALsCleaner-" + i);
cleaner.setDaemon(true);
cleaner.start();
oldWALsCleaner.add(cleaner);
}
return oldWALsCleaner;
}
private void interruptOldWALsCleaner() {
for (Thread cleaner : oldWALsCleaner) {
LOG.trace("Interrupting thread: {}", cleaner);
cleaner.interrupt();
}
oldWALsCleaner.clear();
}
private void deleteFile() {
while (true) {
try {
final CleanerContext context = pendingDelete.take();
Preconditions.checkNotNull(context);
FileStatus oldWalFile = context.getTargetToClean();
try {
LOG.debug("Deleting {}", oldWalFile);
boolean succeed = this.fs.delete(oldWalFile.getPath(), false);
context.setResult(succeed);
} catch (IOException e) {
// fs.delete() fails.
LOG.warn("Failed to delete old WAL file", e);
context.setResult(false);
}
} catch (InterruptedException ite) {
// It is most likely from configuration changing request
LOG.warn("Interrupted while cleaning old WALs, will "
+ "try to clean it next round. Exiting.");
// Restore interrupt status
Thread.currentThread().interrupt();
return;
}
LOG.trace("Exiting");
}
}
@Override
public synchronized void cancel(boolean mayInterruptIfRunning) {
LOG.debug("Cancelling LogCleaner");
super.cancel(mayInterruptIfRunning);
interruptOldWALsCleaner();
}
private static final class CleanerContext {
final FileStatus target;
final AtomicBoolean result;
final CountDownLatch remainingResults;
private CleanerContext(FileStatus status) {
this.target = status;
this.result = new AtomicBoolean(false);
this.remainingResults = new CountDownLatch(1);
}
void setResult(boolean res) {
this.result.set(res);
this.remainingResults.countDown();
}
boolean getResult(long waitIfNotFinished) {
try {
boolean completed = this.remainingResults.await(waitIfNotFinished,
TimeUnit.MILLISECONDS);
if (!completed) {
LOG.warn("Spent too much time [{}ms] deleting old WAL file: {}",
waitIfNotFinished, target);
return false;
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target);
return false;
}
return result.get();
}
FileStatus getTargetToClean() {
return target;
}
@Override
public String toString() {
return "CleanerContext [target=" + target + ", result=" + result + "]";
}
}
}