blob: 6405b305b566a03ccb06b3dff26b4b4ffd5e20ed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.ActionType;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
/**
* This test submits and schedules feed and then check for replication.
* On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time.
* Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement.
*/
@Test(groups = { "distributed", "embedded", "sanity", "multiCluster" })
public class FeedLateRerunTest extends BaseTestClass {
private ColoHelper cluster1 = servers.get(0);
private ColoHelper cluster2 = servers.get(1);
private FileSystem cluster1FS = serverFS.get(0);
private FileSystem cluster2FS = serverFS.get(1);
private OozieClient cluster2OC = serverOC.get(1);
private String baseTestDir = cleanAndGetTestDir();
private String feedDataLocation = baseTestDir + "/source" + MINUTE_DATE_PATTERN;
private String targetPath = baseTestDir + "/target";
private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN;
private static final Logger LOGGER = Logger.getLogger(FeedLateRerunTest.class);
private String source = null;
private String target = null;
@BeforeMethod(alwaysRun = true)
public void setUp() throws JAXBException, IOException {
Bundle bundle = BundleUtil.readFeedReplicationBundle();
bundles[0] = new Bundle(bundle, cluster1);
bundles[1] = new Bundle(bundle, cluster2);
bundles[0].generateUniqueBundle(this);
bundles[1].generateUniqueBundle(this);
}
@AfterMethod(alwaysRun = true)
public void tearDown() {
removeTestClassEntities();
}
@Test(dataProvider = "dataFlagProvider")
public void testLateRerun(boolean dataFlag)
throws URISyntaxException, AuthenticationException, InterruptedException, IOException,
OozieClientException, JAXBException {
Bundle.submitCluster(bundles[0], bundles[1]);
String startTime = TimeUtil.getTimeWrtSystemTime(0);
String endTime = TimeUtil.addMinsToTime(startTime, 30);
LOGGER.info("Time range between : " + startTime + " and " + endTime);
//configure feed
FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
feed.setFilePath(feedDataLocation);
//erase all clusters from feed definition
feed.clearFeedClusters();
//set cluster1 as source
feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTime, endTime)
.withClusterType(ClusterType.SOURCE)
.build());
//set cluster2 as target
feed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(1000000)", ActionType.DELETE)
.withValidity(startTime, endTime)
.withClusterType(ClusterType.TARGET)
.withDataLocation(targetDataLocation)
.build());
String entityName = feed.getName();
//submit and schedule feed
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
//check if coordinator exists
InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, entityName, "REPLICATION"), 1);
//Finding bundleId of replicated instance on target
String bundleId = OozieUtil.getLatestBundleID(cluster2OC, entityName, EntityType.FEED);
//Finding and creating missing dependencies
List<String> missingDependencies = getAndCreateDependencies(
cluster1FS, cluster1.getPrefix(), cluster2OC, bundleId, dataFlag, entityName);
int count = 1;
for (String location : missingDependencies) {
if (count==1) {
source = location;
count++;
}
}
source=splitPathFromIp(source, "8020");
LOGGER.info("source : " + source);
target = source.replace("source", "target");
LOGGER.info("target : " + target);
/* Sleep for some time ( as is defined in runtime property of server ).
Let the instance rerun and then it should succeed.*/
int sleepMins = 8;
for(int i=0; i < sleepMins; i++) {
LOGGER.info("Waiting...");
TimeUtil.sleepSeconds(60);
}
String bundleID = OozieUtil.getLatestBundleID(cluster2OC, entityName, EntityType.FEED);
OozieUtil.validateRetryAttempts(cluster2OC, bundleID, EntityType.FEED, 1);
//check if data has been replicated correctly
List<Path> cluster1ReplicatedData = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster1FS, new Path(HadoopUtil.cutProtocol(source)));
List<Path> cluster2ReplicatedData = HadoopUtil
.getAllFilesRecursivelyHDFS(cluster2FS, new Path(HadoopUtil.cutProtocol(target)));
AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
}
private String splitPathFromIp(String src, String port) {
String reqSrc, tempSrc = "";
if (src.contains(":")) {
String[] tempPath = src.split(":");
for (String aTempPath : tempPath) {
if (aTempPath.startsWith(port)) {
tempSrc = aTempPath;
}
}
}
if (tempSrc.isEmpty()) {
reqSrc = src;
} else {
reqSrc=tempSrc.replace(port, "");
}
return reqSrc;
}
/* prismHelper1 - source colo, prismHelper2 - target colo */
private List<String> getAndCreateDependencies(FileSystem sourceFS, String prefix, OozieClient targetOC,
String bundleId, boolean dataFlag, String entityName) throws OozieClientException, IOException {
List<String> missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
for (int i = 0; i < 10 && missingDependencies == null; ++i) {
TimeUtil.sleepSeconds(30);
LOGGER.info("sleeping...");
missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
}
Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
//print missing dependencies
for (String dependency : missingDependencies) {
LOGGER.info("dependency from job: " + dependency);
}
// Creating missing dependencies
HadoopUtil.createFolders(sourceFS, prefix, missingDependencies);
//Adding data to empty folders depending on dataFlag
if (dataFlag) {
int tempCount = 1;
for (String location : missingDependencies) {
if (tempCount==1) {
LOGGER.info("Transferring data to : " + location);
HadoopUtil.copyDataToFolder(sourceFS, location, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
tempCount++;
}
}
}
//replication should start, wait while it ends
InstanceUtil.waitTillInstanceReachState(targetOC, entityName, 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
// Adding data for late rerun
int tempCounter = 1;
for (String dependency : missingDependencies) {
if (tempCounter==1) {
LOGGER.info("Transferring late data to : " + dependency);
HadoopUtil.copyDataToFolder(sourceFS, dependency,
OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.properties"));
}
tempCounter++;
}
return missingDependencies;
}
@DataProvider(name = "dataFlagProvider")
private Object[][] dataFlagProvider() {
return new Object[][] {
new Object[] {true, },
new Object[] {false, },
};
}
}