blob: 98936ae3a97b7763f34c2a6cad81671d27d4f1ad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.retention;
import org.apache.falcon.Pair;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
/**
* Test for FeedEvictor.
*/
public class FeedEvictorTest {
private EmbeddedCluster cluster;
private final InMemoryWriter stream = new InMemoryWriter(System.out);
private final Map<String, String> map = new HashMap<String, String>();
private String hdfsUrl;
@BeforeClass
public void start() throws Exception {
cluster = EmbeddedCluster.newCluster("test");
hdfsUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
FeedEvictor.OUT.set(stream);
}
@AfterClass
public void close() throws Exception {
cluster.shutdown();
}
@Test
public void testBadArgs() throws Exception {
try {
FeedEvictor.main(null);
Assert.fail("Expected an exception to be thrown");
} catch (Exception ignore) {
// ignore
}
try {
FeedEvictor.main(new String[]{"1", "2"});
Assert.fail("Expected an exception to be thrown");
} catch (Exception ignore) {
// ignore
}
}
@Test
public void testEviction1() throws Exception {
try {
FeedEvictor.main(new String[]{"1", "2", "3", "4", "5", "6", "7"});
} catch (Exception ignore) {
// ignore
}
}
@Test
public void testEviction2() throws Exception {
try {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/"), true);
stream.clear();
Pair<List<String>, List<String>> pair =
createTestData("feed1", "yyyy-MM-dd/'more'/yyyy", 10, TimeUnit.DAYS, "/data");
final String storageUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
String dataPath = LocationType.DATA.name() + "="
+ storageUrl + "/data/YYYY/feed1/mmHH/dd/MM/?{YEAR}-?{MONTH}-?{DAY}/more/?{YEAR}";
String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-01-00.csv";
FeedEvictor.main(new String[]{
"-feedBasePath", dataPath,
"-retentionType", "instance",
"-retentionLimit", "days(10)",
"-timeZone", "UTC",
"-frequency", "daily",
"-logFile", logFile,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
});
assertFailures(fs, pair);
compare(map.get("feed1"), stream.getBuffer());
String expectedInstancePaths = getExpectedInstancePaths(dataPath);
Assert.assertEquals(readLogFile(new Path(logFile)), expectedInstancePaths);
String deletedPath = expectedInstancePaths.split(",")[0].split("=")[1];
Assert.assertFalse(fs.exists(new Path(deletedPath)));
//empty parents
Assert.assertFalse(fs.exists(new Path(deletedPath).getParent()));
Assert.assertFalse(fs.exists(new Path(deletedPath).getParent().getParent()));
//base path not deleted
Assert.assertTrue(fs.exists(new Path("/data/YYYY/feed1/mmHH/dd/MM/")));
} catch (Exception e) {
Assert.fail("Unknown exception", e);
}
}
private String getExpectedInstancePaths(String dataPath) {
StringBuilder newBuffer = new StringBuilder("instancePaths=");
DateFormat format = new SimpleDateFormat("yyyyMMddHHmm");
format.setTimeZone(TimeZone.getTimeZone("UTC"));
String[] locs = dataPath.split("#");
String[] instances = stream.getBuffer().split("instances=")[1].split(",");
if (instances[0].equals("NULL")) {
return "instancePaths=";
}
for (int i = 0; i < locs.length; i++) {
for (int j = 0, k = i * instances.length / locs.length; j < instances.length / locs.length; j++) {
String[] paths = locs[i].split("=");
String path = paths[1];
String instancePath = path.replaceAll("\\?\\{YEAR\\}", instances[j + k].substring(0, 4));
instancePath = instancePath.replaceAll("\\?\\{MONTH\\}", instances[j + k].substring(4, 6));
instancePath = instancePath.replaceAll("\\?\\{DAY\\}", instances[j + k].substring(6, 8));
instancePath = instancePath.replaceAll("\\?\\{HOUR\\}", instances[j + k].substring(8, 10));
instancePath = instancePath.replaceAll("\\?\\{MINUTE\\}", instances[j + k].substring(10, 12));
newBuffer.append(instancePath).append(',');
}
}
return newBuffer.toString();
}
private String readLogFile(Path logFile) throws IOException {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
ByteArrayOutputStream writer = new ByteArrayOutputStream();
InputStream date = fs.open(logFile);
IOUtils.copyBytes(date, writer, 4096, true);
return writer.toString();
}
private void compare(String str1, String str2) {
String[] instances1 = str1.split("=")[1].split(",");
String[] instances2 = str2.split("instances=")[1].split(",");
Arrays.sort(instances1);
Arrays.sort(instances2);
Assert.assertEquals(instances1, instances2);
}
private void assertFailures(FileSystem fs, Pair<List<String>, List<String>> pair) throws IOException {
for (String path : pair.second) {
if (!fs.exists(new Path(path))) {
Assert.fail("Expecting " + path + " to be present");
}
}
for (String path : pair.first) {
if (fs.exists(new Path(path))) {
Assert.fail("Expecting " + path + " to be deleted");
}
}
}
@Test
public void testEviction3() throws Exception {
try {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/"), true);
stream.clear();
Pair<List<String>, List<String>> pair =
createTestData("feed2", "yyyyMMddHH/'more'/yyyy", 5, TimeUnit.HOURS, "/data");
final String storageUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
String dataPath = LocationType.DATA.name() + "="
+ storageUrl + "/data/YYYY/feed2/mmHH/dd/MM/?{YEAR}?{MONTH}?{DAY}?{HOUR}/more/?{YEAR}";
String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-02-00.csv";
FeedEvictor.main(new String[]{
"-feedBasePath", dataPath,
"-retentionType", "instance",
"-retentionLimit", "hours(5)",
"-timeZone", "UTC",
"-frequency", "hourly",
"-logFile", logFile,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
});
assertFailures(fs, pair);
compare(map.get("feed2"), stream.getBuffer());
Assert.assertEquals(readLogFile(new Path(logFile)),
getExpectedInstancePaths(dataPath));
} catch (Exception e) {
Assert.fail("Unknown exception", e);
}
}
@Test
public void testEviction4() throws Exception {
try {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/"), true);
stream.clear();
Pair<List<String>, List<String>> pair = createTestData("/data");
FeedEvictor.main(new String[] {
"-feedBasePath", LocationType.DATA.name() + "="
+ cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)
+ "/data/YYYY/feed3/dd/MM/?{MONTH}/more/?{HOUR}",
"-retentionType", "instance",
"-retentionLimit", "months(5)",
"-timeZone", "UTC",
"-frequency", "hourly",
"-logFile", conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)
+ "/falcon/staging/feed/2012-01-01-04-00",
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
});
Assert.assertEquals("instances=NULL", stream.getBuffer());
stream.clear();
String dataPath = "/data/YYYY/feed4/dd/MM/more/hello";
String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-02-00.csv";
FeedEvictor.main(new String[] {
"-feedBasePath", LocationType.DATA.name() + "="
+ cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY) + dataPath,
"-retentionType", "instance",
"-retentionLimit", "hours(5)",
"-timeZone", "UTC",
"-frequency", "hourly",
"-logFile", logFile,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
});
Assert.assertEquals("instances=NULL", stream.getBuffer());
Assert.assertEquals(readLogFile(new Path(logFile)), getExpectedInstancePaths(dataPath));
assertFailures(fs, pair);
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Unknown exception", e);
}
}
@Test
public void testEviction5() throws Exception {
try {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/"), true);
stream.clear();
Pair<List<String>, List<String>> pair = createTestData("/data");
createTestData("/stats");
createTestData("/meta");
createTestData("/tmp");
final String storageUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
FeedEvictor.main(new String[] {
"-feedBasePath", getFeedBasePath(LocationType.DATA, storageUrl)
+ "#" + getFeedBasePath(LocationType.STATS, storageUrl)
+ "#" + getFeedBasePath(LocationType.META, storageUrl)
+ "#" + getFeedBasePath(LocationType.TMP, storageUrl),
"-retentionType", "instance",
"-retentionLimit", "months(5)",
"-timeZone", "UTC",
"-frequency", "hourly",
"-logFile", conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)
+ "/falcon/staging/feed/2012-01-01-04-00", "-falconFeedStorageType",
Storage.TYPE.FILESYSTEM.name(),
});
Assert.assertEquals("instances=NULL", stream.getBuffer());
stream.clear();
String dataPath = LocationType.DATA.name() + "="
+ cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)
+ "/data/YYYY/feed4/dd/MM/more/hello";
String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-02-00.csv";
FeedEvictor.main(new String[]{
"-feedBasePath", dataPath,
"-retentionType", "instance",
"-retentionLimit", "hours(5)",
"-timeZone", "UTC",
"-frequency", "hourly",
"-logFile", logFile,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
});
Assert.assertEquals("instances=NULL", stream.getBuffer());
Assert.assertEquals(readLogFile(new Path(logFile)), getExpectedInstancePaths(dataPath));
assertFailures(fs, pair);
} catch (Exception e) {
Assert.fail("Unknown exception", e);
}
}
@Test
public void testEviction6() throws Exception {
try {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/"), true);
stream.clear();
Pair<List<String>, List<String>> pair =
createTestData("feed1", "yyyy-MM-dd/'more'/yyyy", 10, TimeUnit.DAYS, "/data");
createTestData("feed1", "yyyy-MM-dd/'more'/yyyy", 10, TimeUnit.DAYS, "/stats");
createTestData("feed1", "yyyy-MM-dd/'more'/yyyy", 10, TimeUnit.DAYS, "/meta");
final String storageUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
String dataPath =
"DATA=" + storageUrl + "/data/YYYY/feed1/mmHH/dd/MM/?{YEAR}-?{MONTH}-?{DAY}/more/?{YEAR}"
+ "#STATS=" + storageUrl + "/stats/YYYY/feed1/mmHH/dd/MM/?{YEAR}-?{MONTH}-?{DAY}/more/?{YEAR}"
+ "#META=" + storageUrl + "/meta/YYYY/feed1/mmHH/dd/MM/?{YEAR}-?{MONTH}-?{DAY}/more/?{YEAR}";
String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-01-00.csv";
FeedEvictor.main(new String[] {
"-feedBasePath", dataPath,
"-retentionType", "instance",
"-retentionLimit", "days(10)",
"-timeZone", "UTC",
"-frequency", "daily",
"-logFile", logFile,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
});
assertFailures(fs, pair);
Assert.assertEquals(readLogFile(new Path(logFile)),
getExpectedInstancePaths(dataPath));
} catch (Exception e) {
Assert.fail("Unknown exception", e);
}
}
@Test
public void testEvictionWithEmptyDirs() throws Exception {
try {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/"), true);
stream.clear();
String feedBasePathString = "/data/YYYY/feed1/mmHH/dd/MM/";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
generateInstances(fs, "feed1", dateFormat.toPattern(), 12, TimeUnit.DAYS, "/data", false);
final String storageUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
String dataPath = LocationType.DATA.name() + "=" + storageUrl + feedBasePathString
+ "?{YEAR}-?{MONTH}-?{DAY}";
String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-01-00.csv";
FeedEvictor.main(new String[]{
"-feedBasePath", dataPath,
"-retentionType", "instance",
"-retentionLimit", "days(10)",
"-timeZone", "UTC",
"-frequency", "daily",
"-logFile", logFile,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
});
Date dateToday = new Date();
Date dateTenDaysAgo = new Date(dateToday.getTime() - 9 * 24 * 3600 * 1000L);
String maxExpectedDataPath = feedBasePathString + dateFormat.format(dateToday);
String minExpectedDataPath = feedBasePathString + dateFormat.format(dateTenDaysAgo);
String expectedInstancePaths = getExpectedInstancePaths(dataPath);
Assert.assertEquals(readLogFile(new Path(logFile)), expectedInstancePaths);
// The base directory has to exist
Assert.assertTrue(fs.exists(new Path(feedBasePathString)));
// Directory with today's date has to exist
Assert.assertTrue(fs.exists(new Path(maxExpectedDataPath)));
// Directory with ten days ago date has to exist
Assert.assertTrue(fs.exists(new Path(minExpectedDataPath)));
// 10 directories have to exist as the feed retention is for 10 days
Assert.assertEquals(fs.listStatus(new Path(feedBasePathString)).length, 10);
} catch (Exception e) {
Assert.fail("Unknown exception", e);
}
}
@Test
public void testFeedBasePathExists() throws Exception {
try {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/"), true);
stream.clear();
String feedBasePathString = "/data/YYYY/feed1/mmHH/dd/MM/";
Pair<List<String>, List<String>> pair =
generateInstances(fs, "feed1", "yyyy-MM-dd/", 10, TimeUnit.DAYS, "/data", true);
final String storageUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
String dataPath = LocationType.DATA.name() + "=" + storageUrl + feedBasePathString
+ "?{YEAR}-?{MONTH}-?{DAY}";
String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-01-00.csv";
FeedEvictor.main(new String[]{
"-feedBasePath", dataPath,
"-retentionType", "instance",
"-retentionLimit", "days(0)",
"-timeZone", "UTC",
"-frequency", "daily",
"-logFile", logFile,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
});
String expectedInstancePaths = getExpectedInstancePaths(dataPath);
Assert.assertEquals(readLogFile(new Path(logFile)), expectedInstancePaths);
// The base directory has to exist
Assert.assertTrue(fs.exists(new Path(feedBasePathString)));
// There should not be any sub directories under the base path
Assert.assertEquals(fs.listStatus(new Path(feedBasePathString)).length, 0);
} catch (Exception e) {
Assert.fail("Unknown exception", e);
}
}
@Test
public void testEvictionStatsMetaWithNoPattern() throws Exception {
try {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/"), true);
stream.clear();
Pair<List<String>, List<String>> pair = createTestData("/data");
createDir("/stats");
createDir("/meta");
createTestData("/tmp");
final String storageUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
FeedEvictor.main(new String[] {
"-feedBasePath",
getFeedBasePath(LocationType.DATA, storageUrl) + "#"
+ getStatsOrMetaPath(LocationType.STATS, storageUrl)
+ "#" + getStatsOrMetaPath(LocationType.META, storageUrl)
+ "#" + getFeedBasePath(LocationType.TMP, storageUrl),
"-retentionType", "instance",
"-retentionLimit", "months(5)",
"-timeZone", "UTC",
"-frequency", "hourly",
"-logFile", conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)
+ "/falcon/staging/feed/2012-01-01-04-00", "-falconFeedStorageType",
Storage.TYPE.FILESYSTEM.name(),
});
// should not throw exception
// stats and meta dir should not be deleted
Assert.assertTrue(isDirPresent("/stats"));
Assert.assertTrue(isDirPresent("/meta"));
} catch (Exception e) {
Assert.fail("Unknown exception", e);
}
}
private Pair<List<String>, List<String>> createTestData(String locationType) throws Exception {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
List<String> outOfRange = new ArrayList<String>();
List<String> inRange = new ArrayList<String>();
touch(fs, locationType + "/YYYY/feed3/dd/MM/02/more/hello", true);
touch(fs, locationType + "/YYYY/feed4/dd/MM/02/more/hello", true);
touch(fs, locationType + "/YYYY/feed1/mmHH/dd/MM/bad-va-lue/more/hello", true);
touch(fs, locationType + "/somedir/feed1/mmHH/dd/MM/bad-va-lue/more/hello", true);
outOfRange.add(locationType + "/YYYY/feed3/dd/MM/02/more/hello");
outOfRange.add(locationType + "/YYYY/feed4/dd/MM/02/more/hello");
outOfRange.add(locationType + "/YYYY/feed1/mmHH/dd/MM/bad-va-lue/more/hello");
outOfRange.add(locationType + "/somedir/feed1/mmHH/dd/MM/bad-va-lue/more/hello");
return Pair.of(inRange, outOfRange);
}
private void createDir(String locationType) throws Exception {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
touch(fs, locationType, false);
}
private Pair<List<String>, List<String>> createTestData(String feed, String mask,
int period, TimeUnit timeUnit,
String locationType) throws Exception {
Configuration conf = cluster.getConf();
FileSystem fs = FileSystem.get(conf);
List<String> outOfRange = new ArrayList<String>();
List<String> inRange = new ArrayList<String>();
Pair<List<String>, List<String>> pair = createTestData(locationType);
outOfRange.addAll(pair.second);
inRange.addAll(pair.first);
pair = generateInstances(fs, feed, mask, period, timeUnit, locationType, true);
outOfRange.addAll(pair.second);
inRange.addAll(pair.first);
return Pair.of(inRange, outOfRange);
}
private Pair<List<String>, List<String>> generateInstances(
FileSystem fs, String feed, String formatString,
int range, TimeUnit timeUnit, String locationType, boolean generateFiles) throws Exception {
List<String> outOfRange = new ArrayList<String>();
List<String> inRange = new ArrayList<String>();
DateFormat format = new SimpleDateFormat(formatString);
format.setTimeZone(TimeZone.getTimeZone("UTC"));
long now = System.currentTimeMillis();
DateFormat displayFormat = new
SimpleDateFormat(timeUnit == TimeUnit.HOURS ? "yyyyMMddHH" : "yyyyMMdd");
displayFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
StringBuilder buffer = new StringBuilder();
for (long date = now;
date > now - timeUnit.toMillis(range + 6);
date -= timeUnit.toMillis(1)) {
String path = locationType + "/YYYY/" + feed + "/mmHH/dd/MM/" + format.format(date);
touch(fs, path, generateFiles);
if (date <= now && date > now - timeUnit.toMillis(range)) {
outOfRange.add(path);
} else {
inRange.add(path);
buffer.append((displayFormat.format(date) + "0000").substring(0, 12)).append(',');
}
}
map.put(feed, "instances=" + buffer.substring(0, buffer.length() - 1));
return Pair.of(inRange, outOfRange);
}
private void touch(FileSystem fs, String path, boolean generateFiles) throws Exception {
if (generateFiles) {
fs.create(new Path(path)).close();
} else {
fs.mkdirs(new Path(path));
}
}
private boolean isDirPresent(String path) throws Exception {
FileSystem fs = FileSystem.get(cluster.getConf());
return fs.exists(new Path(path));
}
private String getFeedBasePath(LocationType locationType, String storageUrl) {
return locationType.name() + "=" + storageUrl
+ "/" + locationType.name().toLowerCase() + "/data/YYYY/feed3/dd/MM/?{MONTH}/more/?{HOUR}";
}
private String getStatsOrMetaPath(LocationType locationType, String storageUrl) {
return locationType.name() + "=" + storageUrl
+ "/" + locationType.name().toLowerCase();
}
private static class InMemoryWriter extends PrintStream {
private final StringBuffer buffer = new StringBuffer();
public InMemoryWriter(OutputStream out) {
super(out);
}
@Override
public void println(String x) {
buffer.append(x);
super.println(x);
}
public String getBuffer() {
return buffer.toString();
}
public void clear() {
buffer.delete(0, buffer.length());
}
}
}