blob: 0c547ab671196558d516760accd7ef746b2a9a85 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.prism;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.List;
@Test(groups = "embedded")
public class OptionalInputTest extends BaseTestClass {
ColoHelper cluster = servers.get(0);
FileSystem clusterFS = serverFS.get(0);
OozieClient oozieClient = serverOC.get(0);
String baseTestDir = baseHDFSDir + "/OptionalInputTest";
String inputPath = baseTestDir + "/input";
String aggregateWorkflowDir = baseTestDir + "/aggregator";
private static final Logger logger = Logger.getLogger(OptionalInputTest.class);
@BeforeClass(alwaysRun = true)
public void uploadWorkflow() throws Exception {
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
}
@BeforeMethod(alwaysRun = true)
public void setup(Method method) throws Exception {
logger.info("test name: " + method.getName());
bundles[0] = BundleUtil.readELBundle();
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
HadoopUtil.deleteDirIfExists(inputPath + "/", clusterFS);
removeBundles();
}
/**
* Test case: set 1 optional and 1 compulsory input. Provide data only for second one. Check
* that process runs without waiting for optional input and finally succeeds.
*
* @throws Exception
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_1optional_1compulsary() throws Exception {
bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1,
"2010-01-02T01:00Z", "2010-01-02T01:12Z");
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
String process = bundles[0].getProcessData();
logger.info(Util.prettyPrintXml(process));
bundles[0].submitAndScheduleBundle(prism, false);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide("2010-01-02T00:50Z",
"2010-01-02T01:10Z", 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input1/", dataDates);
InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process),
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
/**
* Test case: set 1 optional and 2 compulsory inputs. Check that if data hasn't been provided
* process is pending. Then provide data for compulsory inputs and check that process runs
* and finally succeeds without waiting for optional input data.
*
* @throws Exception
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_1optional_2compulsary() throws Exception {
bundles[0].generateRequiredBundle(1, 3, 1, inputPath, 1,
"2010-01-02T01:00Z", "2010-01-02T01:12Z");
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
String processName = Util.readEntityName(bundles[0].getProcessData());
logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
logger.info("instanceShouldStillBeInWaitingState");
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
"2010-01-02T00:50Z", "2010-01-02T01:10Z", 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input1/", dataDates);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input2/", dataDates);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
/**
* Test case: set 2 optional and 1 compulsory inputs. Run process. Check that process
* is pending because of lack of data. Provide it with data only for compulsory input. Check
* that process runs and finally succeeds without waiting for optional input.
*
* @throws Exception
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_2optional_1compulsary() throws Exception {
bundles[0].generateRequiredBundle(1, 3, 2, inputPath, 1, "2010-01-02T01:00Z",
"2010-01-02T01:12Z");
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
String processName = Util.readEntityName(bundles[0].getProcessData());
logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
"2010-01-02T00:50Z", "2010-01-02T01:10Z", 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input2/", dataDates);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
/**
* Test case: set process to have 1 optional and 1 compulsory input. Provide empty
* directories for optional input and normal data for compulsory one. Check that process
* doesn't wait for optional input, runs and finally succeeds.
*
* @throws Exception
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_optionalInputWithEmptyDir() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-4);
String endTime = TimeUtil.getTimeWrtSystemTime(10);
bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, startTime, endTime);
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
String process = bundles[0].getProcessData();
logger.info(Util.prettyPrintXml(process));
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.addMinsToTime(startTime, -10), endTime, 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input1/", dataDates);
HadoopUtil.recreateDir(clusterFS, inputPath + "/input0/");
for (String date : dataDates) {
HadoopUtil.recreateDir(clusterFS, inputPath + "/input0/" + date);
}
bundles[0].submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process),
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
/**
* Test case: set process with both optional inputs. Run it. Check that process have got
* killed.
*
* @throws Exception
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_allInputOptional() throws Exception {
bundles[0].generateRequiredBundle(1, 2, 2, inputPath, 1,
"2010-01-02T01:00Z", "2010-01-02T01:12Z");
bundles[0].setProcessInputNames("inputData");
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
String process = bundles[0].getProcessData();
logger.info(Util.prettyPrintXml(process));
bundles[0].submitAndScheduleBundle(prism, false);
InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process),
2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
}
/**
* Test case: set process with 1 optional and 1 compulsory input. Run it providing necessary
* data. Check that process succeeds. Then update optional input to be compulsory. Check that
* after process was updated it waits for data of updated input. Provide process with
* necessary data and check that it succeeds finally.
*
* @throws Exception
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_updateProcessMakeOptionalCompulsory() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-4);
String endTime = TimeUtil.getTimeWrtSystemTime(30);
bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, startTime, endTime);
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
String process = bundles[0].getProcessData();
String processName = Util.getProcessName(process);
logger.info(Util.prettyPrintXml(process));
bundles[0].submitAndScheduleBundle(prism, true);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.addMinsToTime(startTime, -10), endTime, 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input1/", dataDates);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
final ProcessMerlin processMerlin = new ProcessMerlin(process);
processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1);
bundles[0].setProcessData(processMerlin.toString());
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
process = bundles[0].getProcessData();
logger.info("modified process:" + Util.prettyPrintXml(process));
prism.getProcessHelper().update(process, process);
//from now on ... it should wait of input0 also
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input0/", dataDates);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
/**
* Test case: set process to have 1 optional and 1 compulsory input. Run it providing with
* necessary data. Check that process succeeds without waiting for optional input. Then
* update process to have 2 optional inputs instead of both optional and compulsory. Check
* that process have got killed.
*
* @throws Exception
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_updateProcessMakeCompulsoryOptional() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-4);
String endTime = TimeUtil.getTimeWrtSystemTime(30);
bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, startTime, endTime);
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(4);
String process = bundles[0].getProcessData();
String processName = Util.getProcessName(process);
logger.info(Util.prettyPrintXml(process));
bundles[0].submitAndScheduleBundle(prism, true);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.addMinsToTime(startTime, -10), TimeUtil.addMinsToTime(endTime, 10), 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input1/", dataDates);
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
final ProcessMerlin processMerlin = new ProcessMerlin(process);
processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1);
bundles[0].setProcessData(processMerlin.toString());
process = bundles[0].getProcessData();
//delete all input data
HadoopUtil.deleteDirIfExists(inputPath + "/", clusterFS);
bundles[0].setProcessInputNames("inputData0", "inputData");
logger.info("modified process:" + Util.prettyPrintXml(process));
prism.getProcessHelper().update(process, process);
//from now on ... it should wait of input0 also
InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
}
@AfterClass(alwaysRun = true)
public void tearDownClass() throws IOException {
cleanTestDirs();
}
}