blob: 8552e5413fc5627b8509110fbc6b6bc9af0fd397 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.rest.job;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kylin.shaded.com.google.common.base.Predicate;
import org.apache.kylin.shaded.com.google.common.collect.Iterables;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
public class StorageCleanupJob extends AbstractApplication {
@SuppressWarnings("static-access")
protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false)
.withDescription("Delete the unused storage").create("delete");
@SuppressWarnings("static-access")
protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false)
.withDescription("Warning: will delete all kylin intermediate hive tables").create("force");
protected static final Option THREAD_NUM = OptionBuilder.withArgName("thread").hasArg().isRequired(false)
.withDescription("Warning: use at multi threads to cleanup storage").create("thread");
protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
// ============================================================================
final protected KylinConfig config;
final protected FileSystem hbaseFs;
final protected FileSystem defaultFs;
final protected ExecutableManager executableManager;
protected boolean delete = false;
protected boolean force = false;
protected int threadsNum = 1;
private List<String> hiveGarbageTables = Collections.emptyList();
private List<String> hbaseGarbageTables = Collections.emptyList();
private List<String> hdfsGarbageFiles = Collections.emptyList();
private long hdfsGarbageFileBytes = 0;
public StorageCleanupJob() throws IOException {
this(KylinConfig.getInstanceFromEnv(), HadoopUtil.getWorkingFileSystem(), HBaseConnection
.getFileSystemInHBaseCluster(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
}
protected StorageCleanupJob(KylinConfig config, FileSystem defaultFs, FileSystem hbaseFs) {
this.config = config;
this.defaultFs = defaultFs;
this.hbaseFs = hbaseFs;
this.executableManager = ExecutableManager.getInstance(config);
}
public void setDelete(boolean delete) {
this.delete = delete;
}
public void setForce(boolean force) {
this.force = force;
}
public List<String> getHiveGarbageTables() {
return hiveGarbageTables;
}
public List<String> getHbaseGarbageTables() {
return hbaseGarbageTables;
}
public List<String> getHdfsGarbageFiles() {
return hdfsGarbageFiles;
}
public long getHdfsFileGarbageBytes() {
return hdfsGarbageFileBytes;
}
@Override
protected Options getOptions() {
Options options = new Options();
options.addOption(OPTION_DELETE);
options.addOption(OPTION_FORCE);
options.addOption(THREAD_NUM);
return options;
}
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
logger.info("delete option value: '" + optionsHelper.getOptionValue(OPTION_DELETE) + "'");
logger.info("force option value: '" + optionsHelper.getOptionValue(OPTION_FORCE) + "'");
logger.info("thread option value: '" + optionsHelper.getOptionValue(THREAD_NUM) + "'");
delete = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE));
force = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE));
try {
String threads = optionsHelper.getOptionValue(THREAD_NUM);
if (threads != null) {
threadsNum = Integer.parseInt(threads);
}
} catch (Exception e) {
logger.info("Failed to parse value: {} for thread option: {}",
optionsHelper.getOptionValue(THREAD_NUM), THREAD_NUM);
}
cleanup();
}
// function entrance
public void cleanup() throws Exception {
boolean error = false;
try {
cleanUnusedIntermediateHiveTable();
} catch (Exception e) {
logger.warn("cleanUnusedIntermediateHiveTable() error", e);
error = true;
}
try {
cleanUnusedHBaseTables();
} catch (Exception e) {
logger.warn("cleanUnusedHBaseTables() error", e);
error = true;
}
try {
cleanUnusedHdfsFiles();
} catch (Exception e) {
logger.warn("cleanUnusedHdfsFiles() error", e);
error = true;
}
if (error) {
throw new Exception("clean job has exception");
}
}
protected void cleanUnusedHBaseTables() throws IOException {
if ("hbase".equals(config.getStorageUrl().getScheme()) && !"".equals(config.getMetadataUrl().getScheme())) {
final int deleteTimeoutMin = 2; // Unit minute
try {
// use reflection to isolate NoClassDef errors when HBase is not available
Class hbaseCleanUpUtil = Class.forName("org.apache.kylin.rest.job.StorageCleanJobHbaseUtil");
Method cleanUnusedHBaseTables = hbaseCleanUpUtil.getDeclaredMethod("cleanUnusedHBaseTables",
boolean.class, int.class, int.class);
hbaseGarbageTables = (List<String>) cleanUnusedHBaseTables.invoke(hbaseCleanUpUtil, delete,
deleteTimeoutMin, threadsNum);
} catch (Throwable e) {
logger.error("Error during HBase clean up", e);
}
}
}
private void cleanUnusedHdfsFiles() throws IOException {
UnusedHdfsFileCollector collector = new UnusedHdfsFileCollector();
collectUnusedHdfsFiles(collector);
if (collector.list.isEmpty()) {
logger.info("No HDFS files to clean up");
return;
}
long garbageBytes = 0;
List<String> garbageList = new ArrayList<>();
for (Pair<FileSystem, String> entry : collector.list) {
FileSystem fs = entry.getKey();
String path = entry.getValue();
try {
garbageList.add(path);
ContentSummary sum = fs.getContentSummary(new Path(path));
if (sum != null)
garbageBytes += sum.getLength();
if (delete) {
logger.info("Deleting HDFS path " + path);
fs.delete(new Path(path), true);
} else {
logger.info("Dry run, pending delete HDFS path " + path);
}
} catch (IOException e) {
logger.error("Error dealing unused HDFS path " + path, e);
}
}
hdfsGarbageFileBytes = garbageBytes;
hdfsGarbageFiles = garbageList;
}
protected void collectUnusedHdfsFiles(UnusedHdfsFileCollector collector) throws IOException {
if (StringUtils.isNotEmpty(config.getHBaseClusterFs())) {
cleanUnusedHdfsFiles(hbaseFs, collector, true);
}
cleanUnusedHdfsFiles(defaultFs, collector, false);
}
private void cleanUnusedHdfsFiles(FileSystem fs, UnusedHdfsFileCollector collector, boolean hbaseFs)
throws IOException {
final JobEngineConfig engineConfig = new JobEngineConfig(config);
final CubeManager cubeMgr = CubeManager.getInstance(config);
List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
try {
FileStatus[] fStatus = fs
.listStatus(Path.getPathWithoutSchemeAndAuthority(new Path(config.getHdfsWorkingDirectory())));
if (fStatus != null) {
for (FileStatus status : fStatus) {
String path = status.getPath().getName();
if (path.startsWith("kylin-")) {
allHdfsPathsNeedToBeDeleted.add(status.getPath().toString());
}
}
}
} catch (FileNotFoundException e) {
logger.error("Working Directory does not exist on HDFS.", e);
}
// only remove FINISHED and DISCARDED job intermediate files
List<String> allJobs = executableManager.getAllJobIds();
for (String jobId : allJobs) {
final ExecutableState state = executableManager.getOutput(jobId).getState();
if (!state.isFinalState()) {
String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), jobId);
if (hbaseFs) {
path = HBaseConnection.makeQualifiedPathInHBaseCluster(path);
} else {//Compatible with local fs, unit tests, mockito
Path p = Path.getPathWithoutSchemeAndAuthority(new Path(path));
path = HadoopUtil.getFileSystem(path).makeQualified(p).toString();
}
allHdfsPathsNeedToBeDeleted.remove(path);
logger.info("Skip " + path + " from deletion list, as the path belongs to job " + jobId
+ " with status " + state);
}
}
long maxSegMergeSpan = KylinConfig.getInstanceFromEnv().getMaxSegmentMergeSpan();
// remove every segment working dir from deletion list
for (CubeInstance cube : cubeMgr.reloadAndListAllCubes()) {
for (CubeSegment seg : cube.getSegments()) {
String jobUuid = seg.getLastBuildJobID();
if (jobUuid != null && jobUuid.equals("") == false) {
String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), jobUuid);
if (hbaseFs) {
path = HBaseConnection.makeQualifiedPathInHBaseCluster(path);
} else {//Compatible with local fs, unit tests, mockito
Path p = Path.getPathWithoutSchemeAndAuthority(new Path(path));
path = HadoopUtil.getFileSystem(path).makeQualified(p).toString();
}
if (maxSegMergeSpan > 0 && seg.getTSRange().duration() >= maxSegMergeSpan) {
logger.info("Keep " + path + " from deletion list, as the path belongs to segment " + seg
+ " of cube " + cube.getName() + " with max merging span.");
} else {
allHdfsPathsNeedToBeDeleted.remove(path);
logger.info("Skip " + path + " from deletion list, as the path belongs to segment " + seg
+ " of cube " + cube.getName());
}
}
}
}
// collect the garbage
for (String path : allHdfsPathsNeedToBeDeleted)
collector.add(fs, path);
}
private void cleanUnusedIntermediateHiveTable() throws Exception {
try {
cleanUnusedIntermediateHiveTableInternal();
} catch (NoClassDefFoundError e) {
if (e.getMessage().contains("HiveConf"))
logger.info("Skip cleanup of tntermediate Hive table, seems no Hive on classpath");
else
throw e;
}
}
private void cleanUnusedIntermediateHiveTableInternal() throws Exception {
final int uuidLength = 36;
final String prefix = config.getHiveIntermediateTablePrefix();
final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
HiveCmdBuilder.getHiveTablePrefix().set(prefix);
List<String> hiveTableNames = null;
try {
hiveTableNames = getHiveTables();
} finally {
HiveCmdBuilder.getHiveTablePrefix().remove();
}
Iterable<String> kylinIntermediates = Iterables.filter(hiveTableNames, new Predicate<String>() {
@Override
public boolean apply(@Nullable String input) {
return input != null && input.startsWith(prefix);
}
});
List<String> allJobs = executableManager.getAllJobIds();
List<String> workingJobList = new ArrayList<String>();
List<String> allUuids = getAllUuids(allJobs);
Map<String, String> segmentId2JobId = Maps.newHashMap();
for (String jobId : allJobs) {
// only remove FINISHED and DISCARDED job intermediate table
final ExecutableState state = executableManager.getOutput(jobId).getState();
if (!state.isFinalState()) {
workingJobList.add(jobId);
}
try {
String segmentId = getSegmentIdFromJobId(jobId);
if (segmentId != null) {//some jobs are not cubing jobs
segmentId2JobId.put(segmentId, jobId);
}
} catch (Exception ex) {
logger.warn("Failed to find segment ID from job ID " + jobId + ", ignore it");
// some older version job metadata may fail to read, ignore it
}
}
logger.debug("Working jobIDs: " + workingJobList);
// filter tables to delete
List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
for (String tableName : kylinIntermediates) {
logger.debug("Checking if table is garbage -- " + tableName);
if (!tableName.startsWith(prefix))
continue;
if (force) {
logger.debug("Force include table " + tableName);
allHiveTablesNeedToBeDeleted.add(tableName);
continue;
}
boolean isNeedDel = true;
if (tableName.length() < prefix.length() + uuidLength) {
logger.debug("Skip table because length is not qualified, " + tableName);
continue;
}
String uuid = tableName.substring(tableName.length() - uuidLength, tableName.length());
uuid = uuid.replace("_", "-");
final Pattern UUID_PATTERN = Pattern.compile(uuidPattern);
if (!UUID_PATTERN.matcher(uuid).matches()) {
logger.debug("Skip table because pattern doesn't match, " + tableName);
continue;
}
if (!allUuids.contains(uuid)) {
logger.debug("Skip table because is not current deployment create, " + tableName);
continue;
}
//Some intermediate table ends with job's uuid
if (allJobs.contains(uuid)) {
isNeedDel = !workingJobList.contains(uuid);
} else if (isTableInUse(uuid, workingJobList)) {
logger.debug("Skip table because the table is in use, " + tableName);
isNeedDel = false;
}
if (isNeedDel) {
allHiveTablesNeedToBeDeleted.add(tableName);
}
}
// conclude hive tables to delete
hiveGarbageTables = allHiveTablesNeedToBeDeleted;
if (allHiveTablesNeedToBeDeleted.isEmpty()) {
logger.info("No Hive tables to clean up");
return;
}
if (delete) {
try {
List<List<String>> tablesList = Lists.partition(allHiveTablesNeedToBeDeleted, 20);
for (List<String> tables: tablesList) {
deleteHiveTables(tables, segmentId2JobId);
}
} catch (IOException e) {
logger.error("Error during deleting Hive tables", e);
}
} else {
for (String table : allHiveTablesNeedToBeDeleted) {
logger.info("Dry run, pending delete Hive table " + table);
}
}
}
// override by test
protected List<String> getHiveTables() throws Exception {
ISourceMetadataExplorer explr = SourceManager.getDefaultSource().getSourceMetadataExplorer();
return explr.listTables(config.getHiveDatabaseForIntermediateTable());
}
// override by test
protected CliCommandExecutor getCliCommandExecutor() throws IOException {
return config.getCliCommandExecutor();
}
private void deleteHiveTables(List<String> allHiveTablesNeedToBeDeleted, Map<String, String> segmentId2JobId)
throws IOException {
final JobEngineConfig engineConfig = new JobEngineConfig(config);
final int uuidLength = 36;
final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.addStatement(useDatabaseHql);
for (String delHive : allHiveTablesNeedToBeDeleted) {
hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; ");
logger.info("Deleting Hive table " + delHive);
}
getCliCommandExecutor().execute(hiveCmdBuilder.build());
// If kylin.source.hive.keep-flat-table, some intermediate table might be kept.
// Do delete external path.
for (String tableToDelete : allHiveTablesNeedToBeDeleted) {
String uuid = tableToDelete.substring(tableToDelete.length() - uuidLength, tableToDelete.length());
String segmentId = uuid.replace("_", "-");
if (segmentId2JobId.containsKey(segmentId)) {
String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(),
segmentId2JobId.get(segmentId)) + "/" + tableToDelete;
Path externalDataPath = new Path(path);
if (defaultFs.exists(externalDataPath)) {
defaultFs.delete(externalDataPath, true);
logger.info("Hive table {}'s external path {} deleted", tableToDelete, path);
} else {
logger.info(
"Hive table {}'s external path {} not exist. It's normal if kylin.source.hive.keep-flat-table set false (By default)",
tableToDelete, path);
}
} else {
logger.warn("Hive table {}'s job ID not found, segmentId2JobId: {}", tableToDelete,
segmentId2JobId.toString());
}
}
}
private List<String> getAllUuids(List<String> allJobs) {
List<String> allUuids = new ArrayList<>();
for (String jobId : allJobs) {
allUuids.add(jobId);
try {
String segmentId = getSegmentIdFromJobId(jobId);
if (segmentId != null) {
allUuids.add(segmentId);
}
} catch (Exception ex) {
logger.warn("Failed to find segment ID from job ID " + jobId + ", ignore it");
}
}
return allUuids;
}
private String getSegmentIdFromJobId(String jobId) {
AbstractExecutable abstractExecutable = executableManager.getJob(jobId);
if (abstractExecutable == null) {
return null;
}
String segmentId = abstractExecutable.getParam("segmentId");
return segmentId;
}
private boolean isTableInUse(String segUuid, List<String> workingJobList) {
for (String jobId : workingJobList) {
String segmentId = getSegmentIdFromJobId(jobId);
if (segUuid.equals(segmentId))
return true;
}
return false;
}
protected class UnusedHdfsFileCollector {
LinkedHashSet<Pair<FileSystem, String>> list = new LinkedHashSet<>();
public void add(FileSystem fs, String path) {
list.add(Pair.newPair(fs, path));
}
}
}