blob: 4de79386423bc57e9888f73179c619cc82a017a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.retention;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.el.ExpressionEvaluatorImpl;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
import org.apache.falcon.catalog.CatalogPartition;
import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.FileSystemStorage;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.common.FeedDataPath.VARS;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.jsp.el.ELException;
import javax.servlet.jsp.el.ExpressionEvaluator;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Feed Evictor is called only if the retention policy that applies
* to the feed is that of delete.
*/
public class FeedEvictor extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(FeedEvictor.class);
private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
public static final AtomicReference<PrintStream> OUT = new AtomicReference<PrintStream>(System.out);
private static final String FORMAT = "yyyyMMddHHmm";
// constants to be used while preparing HCatalog partition filter query
private static final String FILTER_ST_BRACKET = "(";
private static final String FILTER_END_BRACKET = ")";
private static final String FILTER_QUOTE = "'";
private static final String FILTER_AND = " and ";
private static final String FILTER_OR = " or ";
private static final String FILTER_LESS_THAN = " < ";
private static final String FILTER_EQUALS = " = ";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path confPath = new Path("file:///" + System.getProperty("oozie.action.conf.xml"));
LOG.info("{} found ? {}", confPath, confPath.getFileSystem(conf).exists(confPath));
conf.addResource(confPath);
int ret = ToolRunner.run(conf, new FeedEvictor(), args);
if (ret != 0) {
throw new Exception("Unable to perform eviction action args: " + Arrays.toString(args));
}
}
private final Map<VARS, String> map = new TreeMap<VARS, String>();
private final StringBuffer instancePaths = new StringBuffer("instancePaths=");
private final StringBuffer buffer = new StringBuffer();
@Override
public int run(String[] args) throws Exception {
CommandLine cmd = getCommand(args);
String feedPattern = cmd.getOptionValue("feedBasePath")
.replaceAll(Storage.QUESTION_EXPR_START_REGEX, Storage.DOLLAR_EXPR_START_REGEX);
String retentionType = cmd.getOptionValue("retentionType");
String retentionLimit = cmd.getOptionValue("retentionLimit");
String timeZone = cmd.getOptionValue("timeZone");
String frequency = cmd.getOptionValue("frequency"); //to write out smart path filters
String logFile = cmd.getOptionValue("logFile");
String feedStorageType = cmd.getOptionValue("falconFeedStorageType");
LOG.info("Applying retention on {} type: {}, Limit: {}, timezone: {}, frequency: {}, storage: {}",
feedPattern, retentionType, retentionLimit, timeZone, frequency, feedStorageType);
Storage storage = FeedHelper.createStorage(feedStorageType, feedPattern);
evict(storage, retentionLimit, timeZone);
logInstancePaths(new Path(logFile));
int len = buffer.length();
if (len > 0) {
OUT.get().println("instances=" + buffer.substring(0, len - 1));
} else {
OUT.get().println("instances=NULL");
}
return 0;
}
private void evict(Storage storage, String retentionLimit, String timeZone)
throws Exception {
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
evictFS((FileSystemStorage) storage, retentionLimit, timeZone);
} else if (storage.getType() == Storage.TYPE.TABLE) {
evictTable((CatalogStorage) storage, retentionLimit, timeZone);
}
}
private void evictFS(FileSystemStorage storage, String retentionLimit, String timeZone)
throws Exception {
for (Location location : storage.getLocations()) {
fileSystemEvictor(storage.getUriTemplate(location.getType()), retentionLimit, timeZone);
}
}
private void fileSystemEvictor(String feedPath, String retentionLimit, String timeZone)
throws IOException, ELException {
Path normalizedPath = new Path(feedPath);
FileSystem fs = normalizedPath.getFileSystem(getConf());
feedPath = normalizedPath.toUri().getPath();
LOG.info("Normalized path: {}", feedPath);
Pair<Date, Date> range = getDateRange(retentionLimit);
String dateMask = getDateFormatInPath(feedPath);
List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, dateMask, range.first, fs);
if (toBeDeleted.isEmpty()) {
LOG.info("No instances to delete.");
return;
}
DateFormat dateFormat = new SimpleDateFormat(FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
Path feedBasePath = getFeedBasePath(feedPath);
for (Path path : toBeDeleted) {
deleteInstance(fs, path, feedBasePath);
Date date = getDate(path, feedPath, dateMask, timeZone);
buffer.append(dateFormat.format(date)).append(',');
instancePaths.append(path).append(",");
}
}
private Path getFeedBasePath(String feedPath) throws IOException {
Matcher matcher = FeedDataPath.PATTERN.matcher(feedPath);
if (matcher.find()) {
return new Path(feedPath.substring(0, matcher.start()));
} else {
throw new IOException("Unable to resolve pattern for feedPath: " + feedPath);
}
}
private void logInstancePaths(Path path) throws IOException {
LOG.info("Writing deleted instances to path {}", path);
FileSystem logfs = path.getFileSystem(getConf());
OutputStream out = logfs.create(path);
out.write(instancePaths.toString().getBytes());
out.close();
debug(logfs, path);
}
private Pair<Date, Date> getDateRange(String period) throws ELException {
Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
Long.class, RESOLVER, RESOLVER);
Date end = new Date();
Date start = new Date(end.getTime() - duration);
return Pair.of(start, end);
}
private List<Path> discoverInstanceToDelete(String inPath, String timeZone, String dateMask,
Date start, FileSystem fs) throws IOException {
FileStatus[] files = findFilesForFeed(fs, inPath);
if (files == null || files.length == 0) {
return Collections.emptyList();
}
List<Path> toBeDeleted = new ArrayList<Path>();
for (FileStatus file : files) {
Date date = getDate(new Path(file.getPath().toUri().getPath()),
inPath, dateMask, timeZone);
LOG.debug("Considering {}", file.getPath().toUri().getPath());
LOG.debug("Date: {}", date);
if (date != null && !isDateInRange(date, start)) {
toBeDeleted.add(new Path(file.getPath().toUri().getPath()));
}
}
return toBeDeleted;
}
private String getDateFormatInPath(String inPath) {
String mask = extractDatePartFromPathMask(inPath, inPath);
//yyyyMMddHHmm
return mask.replaceAll(VARS.YEAR.regex(), "yyyy")
.replaceAll(VARS.MONTH.regex(), "MM")
.replaceAll(VARS.DAY.regex(), "dd")
.replaceAll(VARS.HOUR.regex(), "HH")
.replaceAll(VARS.MINUTE.regex(), "mm");
}
private FileStatus[] findFilesForFeed(FileSystem fs, String feedBasePath) throws IOException {
Matcher matcher = FeedDataPath.PATTERN.matcher(feedBasePath);
while (matcher.find()) {
String var = feedBasePath.substring(matcher.start(), matcher.end());
feedBasePath = feedBasePath.replaceAll(Pattern.quote(var), "*");
matcher = FeedDataPath.PATTERN.matcher(feedBasePath);
}
LOG.info("Searching for {}", feedBasePath);
return fs.globStatus(new Path(feedBasePath));
}
private String extractDatePartFromPathMask(String mask, String inPath) {
String[] elements = FeedDataPath.PATTERN.split(mask);
String out = inPath;
for (String element : elements) {
out = out.replaceFirst(element, "");
}
return out;
}
//consider just the first occurrence of the pattern
private Date getDate(Path file, String inMask,
String dateMask, String timeZone) {
String path = extractDatePartFromPathMask(inMask, file.toString());
populateDatePartMap(path, dateMask);
String errArg = file + "(" + inMask + ")";
if (map.isEmpty()) {
LOG.warn("No date present in {}", errArg);
return null;
}
StringBuilder date = new StringBuilder();
int ordinal = 0;
for (VARS var : map.keySet()) {
if (ordinal++ == var.ordinal()) {
date.append(map.get(var));
} else {
LOG.warn("Prior element to {} is missing {}", var, errArg);
return null;
}
}
try {
DateFormat dateFormat = new SimpleDateFormat(FORMAT.
substring(0, date.length()));
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
return dateFormat.parse(date.toString());
} catch (ParseException e) {
LOG.warn("Unable to parse date: {}, {}", date, errArg);
return null;
}
}
private void populateDatePartMap(String path, String mask) {
map.clear();
Matcher matcher = FeedDataPath.DATE_FIELD_PATTERN.matcher(mask);
int start = 0;
while (matcher.find(start)) {
String subMask = mask.substring(matcher.start(), matcher.end());
String subPath = path.substring(matcher.start(), matcher.end());
VARS var = VARS.from(subMask);
if (!map.containsKey(var)) {
map.put(var, subPath);
}
start = matcher.start() + 1;
}
}
private boolean isDateInRange(Date date, Date start) {
//ignore end ( && date.compareTo(end) <= 0 )
return date.compareTo(start) >= 0;
}
private void deleteInstance(FileSystem fs, Path path, Path feedBasePath) throws IOException {
if (fs.delete(path, true)) {
LOG.info("Deleted instance: {}", path);
}else{
throw new IOException("Unable to delete instance: " + path);
}
deleteParentIfEmpty(fs, path.getParent(), feedBasePath);
}
private void debug(FileSystem fs, Path outPath) throws IOException {
ByteArrayOutputStream writer = new ByteArrayOutputStream();
InputStream instance = fs.open(outPath);
IOUtils.copyBytes(instance, writer, 4096, true);
LOG.debug("Instance Paths copied to {}", outPath);
LOG.debug("Written {}", writer);
}
private CommandLine getCommand(String[] args) throws org.apache.commons.cli.ParseException {
Options options = new Options();
Option opt = new Option("feedBasePath", true,
"base path for feed, ex /data/feed/${YEAR}-${MONTH}");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("falconFeedStorageType", true, "feed storage type, FileSystem or Table");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("retentionType", true,
"type of retention policy like delete, archive etc");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("retentionLimit", true,
"time limit for retention, ex hours(5), months(2), days(90)");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("timeZone", true, "timezone for feed, ex UTC");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("frequency", true,
"frequency of feed, ex hourly, daily, monthly, minute, weekly, yearly");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("logFile", true, "log file for capturing size of feed");
opt.setRequired(true);
options.addOption(opt);
return new GnuParser().parse(options, args);
}
private void evictTable(CatalogStorage storage, String retentionLimit, String timeZone)
throws Exception {
LOG.info("Applying retention on {}, Limit: {}, timezone: {}",
storage.getTable(), retentionLimit, timeZone);
// get sorted date partition keys and values
List<String> datedPartKeys = new ArrayList<String>();
List<String> datedPartValues = new ArrayList<String>();
fillSortedDatedPartitionKVs(storage, datedPartKeys, datedPartValues, retentionLimit, timeZone);
List<CatalogPartition> toBeDeleted = discoverPartitionsToDelete(
storage, datedPartKeys, datedPartValues);
if (toBeDeleted.isEmpty()) {
LOG.info("No partitions to delete.");
return;
}
final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal(
storage.getCatalogUrl(), storage.getDatabase(), storage.getTable());
dropPartitions(storage, toBeDeleted, datedPartKeys, isTableExternal);
}
private List<CatalogPartition> discoverPartitionsToDelete(CatalogStorage storage,
List<String> datedPartKeys, List<String> datedPartValues) throws FalconException, ELException {
final String filter = createFilter(datedPartKeys, datedPartValues);
return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), filter);
}
private void fillSortedDatedPartitionKVs(CatalogStorage storage, List<String> sortedPartKeys,
List<String> sortedPartValues, String retentionLimit, String timeZone) throws ELException {
Pair<Date, Date> range = getDateRange(retentionLimit);
// sort partition keys and values by the date pattern present in value
Map<VARS, String> sortedPartKeyMap = new TreeMap<VARS, String>();
Map<VARS, String> sortedPartValueMap = new TreeMap<VARS, String>();
for (Entry<String, String> entry : storage.getPartitions().entrySet()) {
String datePattern = entry.getValue();
String mask = datePattern.replaceAll(VARS.YEAR.regex(), "yyyy")
.replaceAll(VARS.MONTH.regex(), "MM")
.replaceAll(VARS.DAY.regex(), "dd")
.replaceAll(VARS.HOUR.regex(), "HH")
.replaceAll(VARS.MINUTE.regex(), "mm");
// find the first date pattern present in date mask
VARS vars = VARS.presentIn(mask);
// skip this partition if date mask doesn't contain any date format
if (vars == null) {
continue;
}
// construct dated partition value as per format
DateFormat dateFormat = new SimpleDateFormat(mask);
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
String partitionValue = dateFormat.format(range.first);
// add partition key and value in their sorted maps
if (!sortedPartKeyMap.containsKey(vars)) {
sortedPartKeyMap.put(vars, entry.getKey());
}
if (!sortedPartValueMap.containsKey(vars)) {
sortedPartValueMap.put(vars, partitionValue);
}
}
// add map entries to lists of partition keys and values
sortedPartKeys.addAll(sortedPartKeyMap.values());
sortedPartValues.addAll(sortedPartValueMap.values());
}
private String createFilter(List<String> datedPartKeys, List<String> datedPartValues)
throws ELException {
int numPartitions = datedPartKeys.size();
/* Construct filter query string. As an example, suppose the dated partition keys
* are: [year, month, day, hour] and dated partition values are [2014, 02, 24, 10].
* Then the filter query generated is of the format:
* "(year < '2014') or (year = '2014' and month < '02') or
* (year = '2014' and month = '02' and day < '24') or
* or (year = '2014' and month = '02' and day = '24' and hour < '10')"
*/
StringBuilder filterBuffer = new StringBuilder();
for (int curr = 0; curr < numPartitions; curr++) {
if (curr > 0) {
filterBuffer.append(FILTER_OR);
}
filterBuffer.append(FILTER_ST_BRACKET);
for (int prev = 0; prev < curr; prev++) {
filterBuffer.append(datedPartKeys.get(prev))
.append(FILTER_EQUALS)
.append(FILTER_QUOTE)
.append(datedPartValues.get(prev))
.append(FILTER_QUOTE)
.append(FILTER_AND);
}
filterBuffer.append(datedPartKeys.get(curr))
.append(FILTER_LESS_THAN)
.append(FILTER_QUOTE)
.append(datedPartValues.get(curr))
.append(FILTER_QUOTE)
.append(FILTER_END_BRACKET);
}
return filterBuffer.toString();
}
private void dropPartitions(CatalogStorage storage, List<CatalogPartition> partitionsToDelete,
List<String> datedPartKeys, boolean isTableExternal) throws FalconException, IOException {
// get table partition columns
List<String> partColumns = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
storage.getCatalogUrl(), storage.getDatabase(), storage.getTable());
/* In case partition columns are a super-set of dated partitions, there can be multiple
* partitions that share the same set of date-partition values. All such partitions can
* be deleted by issuing a single HCatalog dropPartition call per date-partition values.
* Arrange the partitions grouped by each set of date-partition values.
*/
Map<Map<String, String>, List<CatalogPartition>> dateToPartitionsMap = new HashMap<
Map<String, String>, List<CatalogPartition>>();
for (CatalogPartition partitionToDrop : partitionsToDelete) {
// create a map of name-values of all columns of this partition
Map<String, String> partitions = new HashMap<String, String>();
for (int i = 0; i < partColumns.size(); i++) {
partitions.put(partColumns.get(i), partitionToDrop.getValues().get(i));
}
// create a map of name-values of dated sub-set of this partition
Map<String, String> datedPartitions = new HashMap<String, String>();
for (String datedPart : datedPartKeys) {
datedPartitions.put(datedPart, partitions.get(datedPart));
}
// add a map entry of this catalog partition corresponding to its date-partition values
List<CatalogPartition> catalogPartitions;
if (dateToPartitionsMap.containsKey(datedPartitions)) {
catalogPartitions = dateToPartitionsMap.get(datedPartitions);
} else {
catalogPartitions = new ArrayList<CatalogPartition>();
}
catalogPartitions.add(partitionToDrop);
dateToPartitionsMap.put(datedPartitions, catalogPartitions);
}
// delete each entry within dateToPartitions Map
for (Entry<Map<String, String>, List<CatalogPartition>> entry : dateToPartitionsMap.entrySet()) {
dropPartitionInstances(storage, entry.getValue(), entry.getKey(), isTableExternal);
}
}
private void dropPartitionInstances(CatalogStorage storage, List<CatalogPartition> partitionsToDrop,
Map<String, String> partSpec, boolean isTableExternal) throws FalconException, IOException {
boolean deleted = CatalogServiceFactory.getCatalogService().dropPartitions(
storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partSpec);
if (!deleted) {
return;
}
for (CatalogPartition partitionToDrop : partitionsToDrop) {
if (isTableExternal) { // nuke the dirs if an external table
final String location = partitionToDrop.getLocation();
final Path path = new Path(location);
deleted = path.getFileSystem(new Configuration()).delete(path, true);
}
if (!isTableExternal || deleted) {
// replace ',' with ';' since message producer splits instancePaths string by ','
String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
LOG.info("Deleted partition: " + partitionInfo);
buffer.append(partSpec).append(',');
instancePaths.append(partitionInfo).append(",");
}
}
}
private void deleteParentIfEmpty(FileSystem fs, Path parent, Path feedBasePath) throws IOException {
if (feedBasePath.equals(parent)) {
LOG.info("Not deleting feed base path: {}", parent);
} else {
FileStatus[] files = fs.listStatus(parent);
if (files != null && files.length == 0) {
LOG.info("Parent path: {} is empty, deleting path", parent);
if (fs.delete(parent, true)) {
LOG.info("Deleted empty dir: {}", parent);
} else {
throw new IOException("Unable to delete parent path:" + parent);
}
deleteParentIfEmpty(fs, parent.getParent(), feedBasePath);
}
}
}
}