blob: 1a3f82ff9b595a60a08916b175e7d25d14442d60 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.prism;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@Test(groups = "embedded")
public class ProcessPartitionExpVariableTest extends BaseTestClass {
private static final Logger logger = Logger.getLogger(ProcessPartitionExpVariableTest.class);
ColoHelper cluster = servers.get(0);
FileSystem clusterFS = serverFS.get(0);
OozieClient clusterOC = serverOC.get(0);
private String baseTestDir = baseHDFSDir + "/ProcessPartitionExpVariableTest";
String aggregateWorkflowDir = baseTestDir + "/aggregator";
@BeforeClass(alwaysRun = true)
public void uploadWorkflow() throws Exception {
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
}
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
logger.info("test name: " + method.getName());
bundles[0] = BundleUtil.readELBundle();
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
removeBundles();
HadoopUtil.deleteDirIfExists(baseTestDir, clusterFS);
}
/**
* Test case: set 1 optional and 1 compulsory input for process. Set partitions for each
* input as expression language variable linked with process properties. Check that process
* runs fine with partition provided for compulsory input as exp variable and succeeds in
* spite of nonexistent partition provided for optional input.
*
* @throws Exception
*/
@Test(enabled = true)
public void ProcessPartitionExpVariableTest_OptionalCompulsoryPartition() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-4);
String endTime = TimeUtil.getTimeWrtSystemTime(30);
bundles[0].generateRequiredBundle(1, 2, 1, baseTestDir, 1, startTime, endTime);
bundles[0].setProcessInputNames("inputData0", "inputData");
Property p = new Property();
p.setName("var1");
p.setValue("hardCoded");
bundles[0].addProcessProperty(p);
bundles[0].setProcessInputPartition("${var1}", "${fileTime}");
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
List<String> dataDates = generateDateAndOneDayAfter(
TimeUtil.oozieDateToDate(TimeUtil.addMinsToTime(startTime, -25)),
TimeUtil.oozieDateToDate(TimeUtil.addMinsToTime(endTime, 25)), 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, baseTestDir
+ "/input1/", dataDates);
InstanceUtil.waitTillInstanceReachState(clusterOC,
Util.getProcessName(bundles[0].getProcessData()), 2,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
/**
* Generates patterns of the form .../2014/03/06/21/57/2014-Mar-07 between two supplied dates.
* There are two dates and the second date is one day after the first one
*
* @param startDate start date
* @param endDate end date
* @param minuteSkip interval with which directories are created
* @return list of such dates
*/
private static List<String> generateDateAndOneDayAfter(DateTime startDate, DateTime endDate,
int minuteSkip) {
final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm/");
final DateTimeFormatter formatter2 = DateTimeFormat.forPattern("yyyy-MMM-dd");
logger.info("generating data between " + formatter.print(startDate) + " and " +
formatter.print(endDate));
List<String> dates = new ArrayList<String>();
while (!startDate.isAfter(endDate)) {
final DateTime nextDate = startDate.plusMinutes(minuteSkip);
dates.add(formatter.print(nextDate) + formatter2.print(nextDate.plusDays(1)));
if (minuteSkip == 0) {
minuteSkip = 1;
}
startDate = nextDate;
}
return dates;
}
//TODO: ProcessPartitionExpVariableTest_OptionalPartition()
//TODO: ProcessPartitionExpVariableTest_CompulsoryPartition()
//TODO: ProcessPartitionExpVariableTest_moreThanOnceVariable()
@AfterClass(alwaysRun = true)
public void tearDownClass() throws IOException {
cleanTestDirs();
}
}