blob: a0d530d858fcb685666cded06222d0e2bd838313 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hbase.hbck1;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class marches through all of the region's hfiles and verifies that
* they are all valid files. One just needs to instantiate the class, use
* checkTables(List<Path>) and then retrieve the corrupted hfiles (and
* quarantined files if in quarantining mode)
*
* The implementation currently parallelizes at the regionDir level.
*
* Copied over wholesale from hbase. Unaltered except for package and imports.
*/
@InterfaceAudience.Private
public class HFileCorruptionChecker {
private static final Logger LOG = LoggerFactory.getLogger(HFileCorruptionChecker.class);
final Configuration conf;
final FileSystem fs;
final CacheConfig cacheConf;
final ExecutorService executor;
final Set<Path> corrupted = new ConcurrentSkipListSet<>();
final Set<Path> failures = new ConcurrentSkipListSet<>();
final Set<Path> quarantined = new ConcurrentSkipListSet<>();
final Set<Path> missing = new ConcurrentSkipListSet<>();
final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<>();
final Set<Path> failureMobFiles = new ConcurrentSkipListSet<>();
final Set<Path> missedMobFiles = new ConcurrentSkipListSet<>();
final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<>();
final boolean inQuarantineMode;
final AtomicInteger hfilesChecked = new AtomicInteger();
final AtomicInteger mobFilesChecked = new AtomicInteger();
public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
boolean quarantine) throws IOException {
this.conf = conf;
this.fs = FileSystem.get(conf);
this.cacheConf = CacheConfig.DISABLED;
this.executor = executor;
this.inQuarantineMode = quarantine;
}
/**
* Checks a path to see if it is a valid hfile.
*
* @param p
* full Path to an HFile
* @throws IOException
* This is a connectivity related exception
*/
protected void checkHFile(Path p) throws IOException {
HFile.Reader r = null;
try {
r = HFile.createReader(fs, p, cacheConf, true, conf);
} catch (CorruptHFileException che) {
LOG.warn("Found corrupt HFile " + p, che);
corrupted.add(p);
if (inQuarantineMode) {
Path dest = createQuarantinePath(p);
LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
boolean success = fs.mkdirs(dest.getParent());
success = success ? fs.rename(p, dest): false;
if (!success) {
failures.add(p);
} else {
quarantined.add(dest);
}
}
return;
} catch (FileNotFoundException fnfe) {
LOG.warn("HFile " + p + " was missing. Likely removed due to compaction/split?");
missing.add(p);
} finally {
hfilesChecked.addAndGet(1);
if (r != null) {
r.close(true);
}
}
}
/**
* Given a path, generates a new path to where we move a corrupted hfile (bad
* trailer, no trailer).
*
* @param hFile
* Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
* /region/cf/file)
* @return path to where corrupted files are stored. This should be
* HBASE_DIR/.corrupt/table/region/cf/file.
*/
Path createQuarantinePath(Path hFile) throws IOException {
// extract the normal dirs structure
Path cfDir = hFile.getParent();
Path regionDir = cfDir.getParent();
Path tableDir = regionDir.getParent();
// build up the corrupted dirs structure
Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
if (conf.get("hbase.hfile.quarantine.dir") != null) {
LOG.warn("hbase.hfile.quarantine.dir is deprecated. Default to " + corruptBaseDir);
}
Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
return corruptHfile;
}
/**
* Check all files in a column family dir.
*/
protected void checkColFamDir(Path cfDir) throws IOException {
FileStatus[] statuses = null;
try {
statuses = fs.listStatus(cfDir); // use same filter as scanner.
} catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Colfam Directory " + cfDir +
" does not exist. Likely due to concurrent split/compaction. Skipping.");
missing.add(cfDir);
return;
}
List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (hfs.isEmpty() && !fs.exists(cfDir)) {
LOG.warn("Colfam Directory " + cfDir +
" does not exist. Likely due to concurrent split/compaction. Skipping.");
missing.add(cfDir);
return;
}
for (FileStatus hfFs : hfs) {
Path hf = hfFs.getPath();
checkHFile(hf);
}
}
/**
* Check all files in a mob column family dir.
*/
protected void checkMobColFamDir(Path cfDir) throws IOException {
FileStatus[] statuses = null;
try {
statuses = fs.listStatus(cfDir); // use same filter as scanner.
} catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Mob colfam Directory " + cfDir +
" does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(cfDir);
return;
}
List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (hfs.isEmpty() && !fs.exists(cfDir)) {
LOG.warn("Mob colfam Directory " + cfDir +
" does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(cfDir);
return;
}
for (FileStatus hfFs : hfs) {
Path hf = hfFs.getPath();
checkMobFile(hf);
}
}
/**
* Checks a path to see if it is a valid mob file.
*
* @param p
* full Path to a mob file.
* @throws IOException
* This is a connectivity related exception
*/
protected void checkMobFile(Path p) throws IOException {
HFile.Reader r = null;
try {
r = HFile.createReader(fs, p, cacheConf, true, conf);
} catch (CorruptHFileException che) {
LOG.warn("Found corrupt mob file " + p, che);
corruptedMobFiles.add(p);
if (inQuarantineMode) {
Path dest = createQuarantinePath(p);
LOG.warn("Quarantining corrupt mob file " + p + " into " + dest);
boolean success = fs.mkdirs(dest.getParent());
success = success ? fs.rename(p, dest): false;
if (!success) {
failureMobFiles.add(p);
} else {
quarantinedMobFiles.add(dest);
}
}
return;
} catch (FileNotFoundException fnfe) {
LOG.warn("Mob file " + p + " was missing. Likely removed due to compaction?");
missedMobFiles.add(p);
} finally {
mobFilesChecked.addAndGet(1);
if (r != null) {
r.close(true);
}
}
}
/**
* Checks all the mob files of a table.
*/
private void checkMobRegionDir(Path regionDir) throws IOException {
if (!fs.exists(regionDir)) {
return;
}
FileStatus[] hfs = null;
try {
hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
} catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Mob directory " + regionDir
+ " does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(regionDir);
return;
}
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (hfs.length == 0 && !fs.exists(regionDir)) {
LOG.warn("Mob directory " + regionDir
+ " does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(regionDir);
return;
}
for (FileStatus hfFs : hfs) {
Path hf = hfFs.getPath();
checkMobColFamDir(hf);
}
}
/**
* Check all column families in a region dir.
*/
protected void checkRegionDir(Path regionDir) throws IOException {
FileStatus[] statuses = null;
try {
statuses = fs.listStatus(regionDir);
} catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Region Directory " + regionDir +
" does not exist. Likely due to concurrent split/compaction. Skipping.");
missing.add(regionDir);
return;
}
List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs));
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (cfs.isEmpty() && !fs.exists(regionDir)) {
LOG.warn("Region Directory " + regionDir +
" does not exist. Likely due to concurrent split/compaction. Skipping.");
missing.add(regionDir);
return;
}
for (FileStatus cfFs : cfs) {
Path cfDir = cfFs.getPath();
checkColFamDir(cfDir);
}
}
/**
* Check all the regiondirs in the specified tableDir
*/
void checkTableDir(Path tableDir) throws IOException {
List<FileStatus> rds =
FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
if (rds == null) {
if (!fs.exists(tableDir)) {
LOG.warn("Table Directory " + tableDir +
" does not exist. Likely due to concurrent delete. Skipping.");
missing.add(tableDir);
}
return;
}
// Parallelize check at the region dir level
List<RegionDirChecker> rdcs = new ArrayList<>(rds.size() + 1);
List<Future<Void>> rdFutures;
for (FileStatus rdFs : rds) {
Path rdDir = rdFs.getPath();
RegionDirChecker work = new RegionDirChecker(rdDir);
rdcs.add(work);
}
// add mob region
rdcs.add(createMobRegionDirChecker(tableDir));
// Submit and wait for completion
try {
rdFutures = executor.invokeAll(rdcs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Region dirs checking interrupted!", ie);
return;
}
for (int i = 0; i < rdFutures.size(); i++) {
Future<Void> f = rdFutures.get(i);
try {
f.get();
} catch (ExecutionException e) {
LOG.warn("Failed to quarantine an HFile in regiondir "
+ rdcs.get(i).regionDir, e.getCause());
// rethrow IOExceptions
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
// rethrow RuntimeExceptions
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
// this should never happen
LOG.error("Unexpected exception encountered", e);
return; // bailing out.
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Region dirs check interrupted!", ie);
// bailing out
return;
}
}
}
/**
* An individual work item for parallelized regiondir processing. This is
* intentionally an inner class so it can use the shared error sets and fs.
*/
private class RegionDirChecker implements Callable<Void> {
final Path regionDir;
RegionDirChecker(Path regionDir) {
this.regionDir = regionDir;
}
@Override
public Void call() throws IOException {
checkRegionDir(regionDir);
return null;
}
}
/**
* An individual work item for parallelized mob dir processing. This is
* intentionally an inner class so it can use the shared error sets and fs.
*/
private class MobRegionDirChecker extends RegionDirChecker {
MobRegionDirChecker(Path regionDir) {
super(regionDir);
}
@Override
public Void call() throws IOException {
checkMobRegionDir(regionDir);
return null;
}
}
/**
* Creates an instance of MobRegionDirChecker.
* @param tableDir The current table directory.
* @return An instance of MobRegionDirChecker.
*/
private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) {
TableName tableName = FSUtils.getTableName(tableDir);
Path mobDir = MobUtils.getMobRegionPath(conf, tableName);
return new MobRegionDirChecker(mobDir);
}
/**
* Check the specified table dirs for bad hfiles.
*/
public void checkTables(Collection<Path> tables) throws IOException {
for (Path t : tables) {
checkTableDir(t);
}
}
/**
* @return the set of check failure file paths after checkTables is called.
*/
public Collection<Path> getFailures() {
return new HashSet<>(failures);
}
/**
* @return the set of corrupted file paths after checkTables is called.
*/
public Collection<Path> getCorrupted() {
return new HashSet<>(corrupted);
}
/**
* @return number of hfiles checked in the last HfileCorruptionChecker run
*/
public int getHFilesChecked() {
return hfilesChecked.get();
}
/**
* @return the set of successfully quarantined paths after checkTables is called.
*/
public Collection<Path> getQuarantined() {
return new HashSet<>(quarantined);
}
/**
* @return the set of paths that were missing. Likely due to deletion/moves from
* compaction or flushes.
*/
public Collection<Path> getMissing() {
return new HashSet<>(missing);
}
/**
* @return the set of check failure mob file paths after checkTables is called.
*/
public Collection<Path> getFailureMobFiles() {
return new HashSet<>(failureMobFiles);
}
/**
* @return the set of corrupted mob file paths after checkTables is called.
*/
public Collection<Path> getCorruptedMobFiles() {
return new HashSet<>(corruptedMobFiles);
}
/**
* @return number of mob files checked in the last HfileCorruptionChecker run
*/
public int getMobFilesChecked() {
return mobFilesChecked.get();
}
/**
* @return the set of successfully quarantined paths after checkTables is called.
*/
public Collection<Path> getQuarantinedMobFiles() {
return new HashSet<>(quarantinedMobFiles);
}
/**
* @return the set of paths that were missing. Likely due to table deletion or deletion/moves
* from compaction.
*/
public Collection<Path> getMissedMobFiles() {
return new HashSet<>(missedMobFiles);
}
/**
* Print a human readable summary of hfile quarantining operations.
*/
public void report(HBaseFsck.ErrorReporter out) {
out.print("Checked " + hfilesChecked.get() + " hfiles for corruption");
out.print(" Corrupt HFiles: " + corrupted.size());
if (inQuarantineMode) {
out.print(" Successfully Quarantined HFiles: " + quarantined.size());
for (Path sq : quarantined) {
out.print(" " + sq);
}
out.print(" Failed Quarantine HFiles: " + failures.size());
for (Path fq : failures) {
out.print(" " + fq);
}
}
out.print(" HFiles moved while checking: " + missing.size());
for (Path mq : missing) {
out.print(" " + mq);
}
String initialState = (corrupted.isEmpty()) ? "OK" : "CORRUPTED";
String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
: "CORRUPTED";
if (inQuarantineMode) {
out.print("HFile Summary: " + initialState + " => " + fixedState);
} else {
out.print("HFile Summary: " + initialState);
}
// print mob-related report
out.print("Checked " + mobFilesChecked.get() + " MOB files for corruption");
out.print(" Corrupt MOB files: " + corruptedMobFiles.size());
if (inQuarantineMode) {
out.print(" Successfully Quarantined MOB files: " + quarantinedMobFiles.size());
for (Path sq : quarantinedMobFiles) {
out.print(" " + sq);
}
out.print(" Failed Quarantine MOB files: " + failureMobFiles.size());
for (Path fq : failureMobFiles) {
out.print(" " + fq);
}
}
out.print(" MOB files moved while checking: " + missedMobFiles.size());
for (Path mq : missedMobFiles) {
out.print(" " + mq);
}
String initialMobState = (corruptedMobFiles.isEmpty()) ? "OK" : "CORRUPTED";
String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK"
: "CORRUPTED";
if (inQuarantineMode) {
out.print("MOB summary: " + initialMobState + " => " + fixedMobState);
} else {
out.print("MOB summary: " + initialMobState);
}
}
}