blob: 2a772e2e0f44bac9253dba28571d15312983cffb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class HiveProtoEventsCleanerTask implements MetastoreTaskThread {
public static final Logger LOG = LoggerFactory.getLogger(HiveProtoEventsCleanerTask.class);
private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" };
private List<Path> eventsBasePaths = new ArrayList<>();
private Configuration conf;
private long ttl;
private static String expiredDatePtn = null;
private static final SystemClock clock = SystemClock.getInstance();
@Override
public void setConf(Configuration conf) {
this.conf = conf;
String hiveEventsDir = HiveConf.getVar(conf, ConfVars.HIVE_PROTO_EVENTS_BASE_PATH);
if (StringUtils.isBlank(hiveEventsDir)) {
return;
}
Path hiveEventsBasePath = new Path(hiveEventsDir);
Path baseDir = hiveEventsBasePath.getParent();
for (String subDir : eventsSubDirs) {
eventsBasePaths.add(new Path(baseDir, subDir));
}
assert(eventsBasePaths.get(0).equals(hiveEventsBasePath));
ttl = HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_TTL, TimeUnit.MILLISECONDS);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public long runFrequency(TimeUnit unit) {
return HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_CLEAN_FREQ, unit);
}
@Override
public void run() {
// If Hive proto logging is not enabled, then nothing to be cleaned-up.
if (eventsBasePaths.isEmpty()) {
return;
}
// Expired date should be computed each time we run cleaner thread.
computeExpiredDatePtn(ttl);
for (Path basePath : eventsBasePaths) {
cleanupDir(basePath);
}
}
/**
* Compute the expired date partition, using the underlying clock in UTC time.
*/
private static void computeExpiredDatePtn(long ttl) {
// Use UTC date to ensure reader date is same on all timezones.
LocalDate expiredDate
= LocalDateTime.ofEpochSecond((clock.getTime() - ttl) / 1000, 0, ZoneOffset.UTC).toLocalDate();
expiredDatePtn = "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(expiredDate);
}
/**
* Path filters to include only expired date partitions based on TTL.
*/
private static final PathFilter expiredDatePartitionsFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
String dirName = path.getName();
return ((dirName.startsWith("date="))
&& (dirName.compareTo(expiredDatePtn) <= 0));
}
};
/**
* Finds the expired date partitioned events directory based on TTL and delete them.
*/
private void cleanupDir(Path eventsBasePath) {
LOG.debug("Trying to delete expired proto events from " + eventsBasePath);
try {
FileSystem fs = FileSystem.get(eventsBasePath.toUri(), conf);
if (!fs.exists(eventsBasePath)) {
return;
}
FileStatus[] statuses = fs.listStatus(eventsBasePath, expiredDatePartitionsFilter);
for (FileStatus dir : statuses) {
try {
deleteDirByOwner(fs, dir);
LOG.info("Deleted expired proto events dir: " + dir.getPath());
} catch (IOException ioe) {
// Log error and continue to delete other expired dirs.
LOG.error("Error deleting expired proto events dir " + dir.getPath(), ioe);
}
}
} catch (IOException e) {
LOG.error("Error while trying to delete expired proto events from " + eventsBasePath, e);
}
}
/**
* Delete the events dir with it's owner as proxy user.
*/
private void deleteDirByOwner(FileSystem fs, FileStatus eventsDir) throws IOException {
String owner = eventsDir.getOwner();
if (owner.equals(System.getProperty("user.name"))) {
fs.delete(eventsDir.getPath(), true);
} else {
LOG.info("Deleting " + eventsDir.getPath() + " as user " + owner);
UserGroupInformation ugi = UserGroupInformation.createProxyUser(owner,
UserGroupInformation.getLoginUser());
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// New FileSystem object to be obtained in user context for doAs flow.
try (FileSystem doAsFs = FileSystem.newInstance(eventsDir.getPath().toUri(), conf)) {
doAsFs.delete(eventsDir.getPath(), true);
}
return null;
}
});
} catch (InterruptedException ie) {
LOG.error("Could not delete " + eventsDir.getPath() + " for UGI: " + ugi, ie);
}
}
}
}