| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.falcon.regression; |
| |
| import org.apache.falcon.regression.Entities.FeedMerlin; |
| import org.apache.falcon.regression.core.bundle.Bundle; |
| import org.apache.falcon.entity.v0.EntityType; |
| import org.apache.falcon.entity.v0.feed.ActionType; |
| import org.apache.falcon.entity.v0.feed.ClusterType; |
| import org.apache.falcon.regression.core.helpers.ColoHelper; |
| import org.apache.falcon.regression.core.response.InstancesResult; |
| import org.apache.falcon.regression.core.util.AssertUtil; |
| import org.apache.falcon.regression.core.util.BundleUtil; |
| import org.apache.falcon.regression.core.util.HadoopUtil; |
| import org.apache.falcon.regression.core.util.InstanceUtil; |
| import org.apache.falcon.regression.core.util.OSUtil; |
| import org.apache.falcon.regression.core.util.TimeUtil; |
| import org.apache.falcon.regression.core.util.Util; |
| import org.apache.falcon.regression.core.util.XmlUtil; |
| import org.apache.falcon.regression.testHelper.BaseTestClass; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.security.authentication.client.AuthenticationException; |
| import org.apache.log4j.Logger; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.OozieClientException; |
| import org.joda.time.DateTime; |
| import org.joda.time.DateTimeZone; |
| import org.joda.time.format.DateTimeFormat; |
| import org.joda.time.format.DateTimeFormatter; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import javax.xml.bind.JAXBException; |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.net.URISyntaxException; |
| import java.util.List; |
| |
| /** |
| * feed replication test. |
| */ |
| @Test(groups = "embedded") |
| public class FeedReplicationTest extends BaseTestClass { |
| |
| private ColoHelper cluster1 = servers.get(0); |
| private ColoHelper cluster2 = servers.get(1); |
| private ColoHelper cluster3 = servers.get(2); |
| private FileSystem cluster1FS = serverFS.get(0); |
| private FileSystem cluster2FS = serverFS.get(1); |
| private FileSystem cluster3FS = serverFS.get(2); |
| private OozieClient cluster2OC = serverOC.get(1); |
| private OozieClient cluster3OC = serverOC.get(2); |
| private String baseTestDir = baseHDFSDir + "/FeedReplicationTest"; |
| private String sourcePath = baseTestDir + "/source"; |
| private String feedDataLocation = baseTestDir + "/source" + MINUTE_DATE_PATTERN; |
| private String targetPath = baseTestDir + "/target"; |
| private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN; |
| private static final Logger LOGGER = Logger.getLogger(FeedReplicationTest.class); |
| |
| @BeforeMethod(alwaysRun = true) |
| public void setUp(Method method) throws JAXBException, IOException { |
| LOGGER.info("test name: " + method.getName()); |
| Bundle bundle = |
| BundleUtil.readLocalDCBundle(baseAppHDFSDir, this.getClass().getSimpleName()); |
| |
| bundles[0] = new Bundle(bundle, cluster1); |
| bundles[1] = new Bundle(bundle, cluster2); |
| bundles[2] = new Bundle(bundle, cluster3); |
| |
| bundles[0].generateUniqueBundle(); |
| bundles[1].generateUniqueBundle(); |
| bundles[2].generateUniqueBundle(); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| public void tearDown() { |
| removeBundles(); |
| } |
| |
| /** |
| * Test demonstrates replication of stored data from one source cluster to one target cluster. |
| * It checks the lifecycle of replication workflow instance including its creation. When |
| * replication ends test checks if data was replicated correctly. |
| */ |
| @Test |
| public void replicate1Source1Target() |
| throws AuthenticationException, IOException, URISyntaxException, JAXBException, |
| OozieClientException { |
| Bundle.submitCluster(bundles[0], bundles[1]); |
| String startTime = TimeUtil.getTimeWrtSystemTime(0); |
| String endTime = TimeUtil.addMinsToTime(startTime, 5); |
| LOGGER.info("Time range between : " + startTime + " and " + endTime); |
| |
| //configure feed |
| String feed = bundles[0].getDataSets().get(0); |
| feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation); |
| //erase all clusters from feed definition |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null, |
| ClusterType.SOURCE, null); |
| //set cluster1 as source |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTime, endTime), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), |
| Util.readEntityName(bundles[0].getClusters().get(0)), |
| ClusterType.SOURCE, null); |
| //set cluster2 as target |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTime, endTime), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), |
| Util.readEntityName(bundles[1].getClusters().get(0)), |
| ClusterType.TARGET, null, targetDataLocation); |
| |
| //submit and schedule feed |
| LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); |
| AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); |
| |
| //upload necessary data |
| DateTime date = new DateTime(startTime, DateTimeZone.UTC); |
| DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'"); |
| String timePattern = fmt.print(date); |
| String sourceLocation = sourcePath + "/" + timePattern + "/"; |
| String targetLocation = targetPath + "/" + timePattern + "/"; |
| HadoopUtil.recreateDir(cluster1FS, sourceLocation); |
| |
| Path toSource = new Path(sourceLocation); |
| Path toTarget = new Path(targetLocation); |
| HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, |
| OSUtil.RESOURCES + "feed-s4Replication.xml"); |
| HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt"); |
| |
| //check if coordinator exists |
| InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); |
| |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), |
| "REPLICATION"), 1); |
| |
| //replication should start, wait while it ends |
| InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, |
| CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); |
| |
| //check if data has been replicated correctly |
| List<Path> cluster1ReplicatedData = HadoopUtil |
| .getAllFilesRecursivelyHDFS(cluster1FS, toSource); |
| List<Path> cluster2ReplicatedData = HadoopUtil |
| .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); |
| |
| AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); |
| } |
| |
| /** |
| * Test demonstrates replication of stored data from one source cluster to two target clusters. |
| * It checks the lifecycle of replication workflow instances including their creation on both |
| * targets. When replication ends test checks if data was replicated correctly. |
| */ |
| @Test |
| public void replicate1Source2Targets() throws Exception { |
| Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); |
| String startTime = TimeUtil.getTimeWrtSystemTime(0); |
| String endTime = TimeUtil.addMinsToTime(startTime, 5); |
| LOGGER.info("Time range between : " + startTime + " and " + endTime); |
| |
| //configure feed |
| String feed = bundles[0].getDataSets().get(0); |
| feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation); |
| //erase all clusters from feed definition |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null, |
| ClusterType.SOURCE, null); |
| //set cluster1 as source |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTime, endTime), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), |
| Util.readEntityName(bundles[0].getClusters().get(0)), |
| ClusterType.SOURCE, null); |
| //set cluster2 as target |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTime, endTime), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), |
| Util.readEntityName(bundles[1].getClusters().get(0)), |
| ClusterType.TARGET, null, targetDataLocation); |
| //set cluster3 as target |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTime, endTime), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), |
| Util.readEntityName(bundles[2].getClusters().get(0)), |
| ClusterType.TARGET, null, targetDataLocation); |
| |
| //submit and schedule feed |
| LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); |
| AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); |
| |
| //upload necessary data |
| DateTime date = new DateTime(startTime, DateTimeZone.UTC); |
| DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'"); |
| String timePattern = fmt.print(date); |
| String sourceLocation = sourcePath + "/" + timePattern + "/"; |
| String targetLocation = targetPath + "/" + timePattern + "/"; |
| HadoopUtil.recreateDir(cluster1FS, sourceLocation); |
| |
| Path toSource = new Path(sourceLocation); |
| Path toTarget = new Path(targetLocation); |
| HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, |
| OSUtil.RESOURCES + "feed-s4Replication.xml"); |
| HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt"); |
| |
| //check if all coordinators exist |
| InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); |
| |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, feed, 0); |
| |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), |
| "REPLICATION"), 1); |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed), |
| "REPLICATION"), 1); |
| //replication on cluster 2 should start, wait till it ends |
| InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, |
| CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); |
| |
| //replication on cluster 3 should start, wait till it ends |
| InstanceUtil.waitTillInstanceReachState(cluster3OC, Util.readEntityName(feed), 1, |
| CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); |
| |
| //check if data has been replicated correctly |
| List<Path> cluster1ReplicatedData = HadoopUtil |
| .getAllFilesRecursivelyHDFS(cluster1FS, toSource); |
| List<Path> cluster2ReplicatedData = HadoopUtil |
| .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); |
| List<Path> cluster3ReplicatedData = HadoopUtil |
| .getAllFilesRecursivelyHDFS(cluster3FS, toTarget); |
| |
| AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); |
| AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster3ReplicatedData); |
| } |
| |
| /** |
| * Test demonstrates how replication depends on availability flag. Scenario includes one |
| * source and one target cluster. When feed is submitted and scheduled and data is available, |
| * feed still waits for availability flag (file which name is defined as availability flag in |
| * feed definition). As soon as mentioned file is got uploaded in data directory, |
| * replication starts and when it ends test checks if data was replicated correctly. |
| */ |
| @Test |
| public void availabilityFlagTest() throws Exception { |
| //replicate1Source1Target scenario + set availability flag but don't upload required file |
| Bundle.submitCluster(bundles[0], bundles[1]); |
| String startTime = TimeUtil.getTimeWrtSystemTime(0); |
| String endTime = TimeUtil.addMinsToTime(startTime, 5); |
| LOGGER.info("Time range between : " + startTime + " and " + endTime); |
| |
| //configure feed |
| String availabilityFlagName = "README.md"; |
| String feedName = Util.readEntityName(bundles[0].getDataSets().get(0)); |
| FeedMerlin feedElement = bundles[0].getFeedElement(feedName); |
| feedElement.setAvailabilityFlag(availabilityFlagName); |
| bundles[0].writeFeedElement(feedElement, feedName); |
| String feed = bundles[0].getDataSets().get(0); |
| feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation); |
| //erase all clusters from feed definition |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null, |
| ClusterType.SOURCE, null); |
| //set cluster1 as source |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTime, endTime), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), |
| Util.readEntityName(bundles[0].getClusters().get(0)), |
| ClusterType.SOURCE, null); |
| //set cluster2 as target |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTime, endTime), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), |
| Util.readEntityName(bundles[1].getClusters().get(0)), |
| ClusterType.TARGET, null, targetDataLocation); |
| |
| //submit and schedule feed |
| LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); |
| AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); |
| |
| //upload necessary data |
| DateTime date = new DateTime(startTime, DateTimeZone.UTC); |
| DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'"); |
| String timePattern = fmt.print(date); |
| String sourceLocation = sourcePath + "/" + timePattern + "/"; |
| String targetLocation = targetPath + "/" + timePattern + "/"; |
| HadoopUtil.recreateDir(cluster1FS, sourceLocation); |
| |
| Path toSource = new Path(sourceLocation); |
| Path toTarget = new Path(targetLocation); |
| HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, |
| OSUtil.RESOURCES + "feed-s4Replication.xml"); |
| HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt"); |
| |
| //check while instance is got created |
| InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); |
| |
| //check if coordinator exists |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster2.getFeedHelper(), feedName, "REPLICATION"), 1); |
| |
| //replication should not start even after time |
| TimeUtil.sleepSeconds(60); |
| InstancesResult r = prism.getFeedHelper().getProcessInstanceStatus(feedName, |
| "?start=" + startTime + "&end=" + endTime); |
| InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0); |
| LOGGER.info("Replication didn't start."); |
| |
| //create availability flag on source |
| HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, availabilityFlagName); |
| |
| //check if instance become running |
| InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, |
| CoordinatorAction.Status.RUNNING, EntityType.FEED); |
| |
| //wait till instance succeed |
| InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, |
| CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); |
| |
| //check if data was replicated correctly |
| List<Path> cluster1ReplicatedData = HadoopUtil |
| .getAllFilesRecursivelyHDFS(cluster1FS, toSource); |
| LOGGER.info("Data on source cluster: " + cluster1ReplicatedData); |
| List<Path> cluster2ReplicatedData = HadoopUtil |
| .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); |
| LOGGER.info("Data on target cluster: " + cluster2ReplicatedData); |
| AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); |
| } |
| |
| @AfterClass(alwaysRun = true) |
| public void tearDownClass() throws IOException { |
| cleanTestDirs(); |
| } |
| } |