blob: 5dc96e2ee8be22579158baa01dac361e60f9fe2f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.hcat;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
import org.apache.falcon.regression.core.enumsAndConstants.RetentionUnit;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HCatUtil;
import org.apache.falcon.regression.core.util.MathUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util.URLS;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
import org.apache.hive.hcatalog.api.HCatPartition;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.oozie.client.OozieClient;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.annotations.DataProvider;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class HCatRetentionTest extends BaseTestClass {
private static final Logger logger = Logger.getLogger(HCatRetentionTest.class);
private Bundle bundle;
public static HCatClient cli;
final String testDir = "/HCatRetentionTest/";
final String baseTestHDFSDir = baseHDFSDir + testDir;
final String dBName = "default";
final ColoHelper cluster = servers.get(0);
final FileSystem clusterFS = serverFS.get(0);
final OozieClient clusterOC = serverOC.get(0);
String tableName;
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
HadoopUtil.recreateDir(clusterFS, baseTestHDFSDir);
cli = cluster.getClusterHelper().getHCatClient();
bundle = new Bundle(BundleUtil.readHCat2Bundle(), cluster);
bundle.generateUniqueBundle();
bundle.submitClusters(prism);
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws HCatException {
bundle.deleteBundle(prism);
cli.dropTable(dBName, tableName, true);
}
@Test(enabled = true, dataProvider = "loopBelow", timeOut = 900000, groups = "embedded")
public void testHCatRetention(int retentionPeriod, RetentionUnit retentionUnit,
FeedType feedType) throws Exception {
/*the hcatalog table that is created changes tablename characters to lowercase. So the
name in the feed should be the same.*/
tableName = String.format("testhcatretention_%s_%d", retentionUnit.getValue(),
retentionPeriod);
createPartitionedTable(cli, dBName, tableName, baseTestHDFSDir, feedType);
FeedMerlin feedElement = new FeedMerlin(bundle.getInputFeedFromBundle());
feedElement.setTableValue(dBName, tableName, feedType.getHcatPathValue());
feedElement
.setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")");
if (retentionPeriod <= 0) {
AssertUtil.assertFailed(prism.getFeedHelper()
.submitEntity(URLS.SUBMIT_URL, bundle.getInputFeedFromBundle()));
} else {
final DateTime dataStartTime = new DateTime(
feedElement.getClusters().getClusters().get(0).getValidity().getStart(),
DateTimeZone.UTC).withSecondOfMinute(0);
final DateTime dataEndTime = new DateTime(
feedElement.getClusters().getClusters().get(0).getValidity().getEnd(),
DateTimeZone.UTC).withSecondOfMinute(0);
final List<DateTime> dataDates =
TimeUtil.getDatesOnEitherSide(dataStartTime, dataEndTime, feedType);
final List<String> dataDateStrings = TimeUtil.convertDatesToString(dataDates,
feedType.getFormatter());
AssertUtil.checkForListSizes(dataDates, dataDateStrings);
final List<String> dataFolders = HadoopUtil.flattenAndPutDataInFolder(clusterFS,
OSUtil.OOZIE_EXAMPLE_INPUT_LATE_INPUT, baseTestHDFSDir, dataDateStrings);
addPartitionsToExternalTable(cli, dBName, tableName, feedType, dataDates, dataFolders);
List<String> initialData =
getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir, feedType);
List<HCatPartition> initialPtnList = cli.getPartitions(dBName, tableName);
AssertUtil.checkForListSizes(initialData, initialPtnList);
AssertUtil.assertSucceeded(prism.getFeedHelper()
.submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedElement.toString()));
final String bundleId = OozieUtil.getBundles(clusterOC, feedElement.getName(),
EntityType.FEED).get(0);
OozieUtil.waitForRetentionWorkflowToSucceed(bundleId, clusterOC);
AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(URLS.SUSPEND_URL,
feedElement.toString()));
List<String> expectedOutput = getExpectedOutput(retentionPeriod, retentionUnit,
feedType, new DateTime(DateTimeZone.UTC), initialData);
List<String> finalData = getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir,
feedType);
List<HCatPartition> finalPtnList = cli.getPartitions(dBName, tableName);
logger.info("checking expectedOutput and finalPtnList");
AssertUtil.checkForListSizes(expectedOutput, finalPtnList);
logger.info("checking expectedOutput and finalData");
AssertUtil.checkForListSizes(expectedOutput, finalData);
logger.info("finalData = " + finalData);
logger.info("expectedOutput = " + expectedOutput);
Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]),
expectedOutput.toArray(new String[expectedOutput.size()])),
"expectedOutput and finalData don't match");
}
}
private static List<String> getHadoopDataFromDir(FileSystem fs, String hadoopPath,
String dir, FeedType feedType)
throws IOException {
List<String> finalResult = new ArrayList<String>();
final int dirDepth = feedType.getDirDepth();
List<Path> results = HadoopUtil.getAllDirsRecursivelyHDFS(fs,
new Path(hadoopPath), dirDepth);
for (Path result : results) {
int pathDepth = result.toString().split(dir)[1].split("/").length - 1;
if (pathDepth == dirDepth) {
finalResult.add(result.toString().split(dir)[1]);
}
}
return finalResult;
}
/**
* Get the expected output after retention is applied
*
* @param retentionPeriod retention period
* @param retentionUnit retention unit
* @param feedType feed type
* @param endDateUTC end date of retention
* @param inputData input data on which retention was applied
* @return expected output of the retention
*/
private static List<String> getExpectedOutput(int retentionPeriod,
RetentionUnit retentionUnit,
FeedType feedType,
DateTime endDateUTC,
List<String> inputData) {
List<String> finalData = new ArrayList<String>();
//convert the end date to the same format
final String endLimit =
feedType.getFormatter().print(retentionUnit.minusTime(endDateUTC, retentionPeriod));
//now to actually check!
for (String testDate : inputData) {
if (testDate.compareTo(endLimit) >= 0) {
finalData.add(testDate);
}
}
return finalData;
}
private static void createPartitionedTable(HCatClient client, String dbName, String tableName,
String tableLoc, FeedType dataType)
throws HCatException {
ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
ArrayList<HCatFieldSchema> ptnCols = new ArrayList<HCatFieldSchema>();
//client.dropDatabase("sample_db", true, HCatClient.DropDBMode.CASCADE);
cols.add(HCatUtil.getStringSchema("id", "id comment"));
cols.add(HCatUtil.getStringSchema("value", "value comment"));
switch (dataType) {
case MINUTELY:
ptnCols.add(
HCatUtil.getStringSchema("minute", "min prt"));
case HOURLY:
ptnCols.add(
HCatUtil.getStringSchema("hour", "hour prt"));
case DAILY:
ptnCols.add(HCatUtil.getStringSchema("day", "day prt"));
case MONTHLY:
ptnCols.add(
HCatUtil.getStringSchema("month", "month prt"));
case YEARLY:
ptnCols.add(
HCatUtil.getStringSchema("year", "year prt"));
default:
break;
}
HCatCreateTableDesc tableDesc = HCatCreateTableDesc
.create(dbName, tableName, cols)
.fileFormat("rcfile")
.ifNotExists(true)
.partCols(ptnCols)
.isTableExternal(true)
.location(tableLoc)
.build();
client.dropTable(dbName, tableName, true);
client.createTable(tableDesc);
}
private static void addPartitionsToExternalTable(HCatClient client, String dbName,
String tableName, FeedType feedType,
List<DateTime> dataDates,
List<String> dataFolders)
throws HCatException {
//Adding specific partitions that map to an external location
Map<String, String> ptn = new HashMap<String, String>();
for (int i = 0; i < dataDates.size(); ++i) {
final String dataFolder = dataFolders.get(i);
final DateTime dataDate = dataDates.get(i);
switch (feedType) {
case MINUTELY:
ptn.put("minute", "" + dataDate.getMinuteOfHour());
case HOURLY:
ptn.put("hour", "" + dataDate.getHourOfDay());
case DAILY:
ptn.put("day", "" + dataDate.getDayOfMonth());
case MONTHLY:
ptn.put("month", "" + dataDate.getMonthOfYear());
case YEARLY:
ptn.put("year", "" + dataDate.getYear());
break;
default:
Assert.fail("Unexpected feedType = " + feedType);
}
//Each HCat partition maps to a directory, not to a file
HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(dbName,
tableName, dataFolder, ptn).build();
client.addPartition(addPtn);
ptn.clear();
}
}
@DataProvider(name = "loopBelow")
public Object[][] getTestData(Method m) {
RetentionUnit[] retentionUnits = new RetentionUnit[]{RetentionUnit.HOURS, RetentionUnit.DAYS,
RetentionUnit.MONTHS};// "minutes","years",
Integer[] periods = new Integer[]{7, 824, 43}; // a negative value like -4 should be covered
// in validation scenarios.
FeedType[] dataTypes =
new FeedType[]{
//disabling since falcon has support is for only for single hcat partition
//FeedType.DAILY, FeedType.MINUTELY, FeedType.HOURLY, FeedType.MONTHLY,
FeedType.YEARLY};
return MathUtil.crossProduct(periods, retentionUnits, dataTypes);
}
@AfterClass(alwaysRun = true)
public void tearDownClass() throws IOException {
cleanTestDirs();
}
}