| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.falcon.regression.prism; |
| |
| import com.jcraft.jsch.JSchException; |
| import org.apache.falcon.regression.core.bundle.Bundle; |
| import org.apache.falcon.entity.v0.EntityType; |
| import org.apache.falcon.entity.v0.Frequency; |
| import org.apache.falcon.entity.v0.feed.ActionType; |
| import org.apache.falcon.entity.v0.feed.ClusterType; |
| import org.apache.falcon.regression.core.helpers.ColoHelper; |
| import org.apache.falcon.regression.core.response.ServiceResponse; |
| import org.apache.falcon.regression.core.util.AssertUtil; |
| import org.apache.falcon.regression.core.util.BundleUtil; |
| import org.apache.falcon.regression.core.util.HadoopUtil; |
| import org.apache.falcon.regression.core.util.InstanceUtil; |
| import org.apache.falcon.regression.core.util.OSUtil; |
| import org.apache.falcon.regression.core.util.OozieUtil; |
| import org.apache.falcon.regression.core.util.TimeUtil; |
| import org.apache.falcon.regression.core.util.Util; |
| import org.apache.falcon.regression.core.util.XmlUtil; |
| import org.apache.falcon.regression.testHelper.BaseTestClass; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.security.authentication.client.AuthenticationException; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.OozieClientException; |
| import org.custommonkey.xmlunit.Diff; |
| import org.custommonkey.xmlunit.XMLUnit; |
| import org.joda.time.DateTime; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| import org.xml.sax.SAXException; |
| import org.apache.log4j.Logger; |
| |
| import javax.xml.bind.JAXBException; |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.net.URISyntaxException; |
| import java.util.List; |
| |
| |
| public class UpdateAtSpecificTimeTest extends BaseTestClass { |
| |
| private static final Logger logger = Logger.getLogger(UpdateAtSpecificTimeTest.class); |
| |
| Bundle processBundle; |
| ColoHelper cluster1 = servers.get(0); |
| ColoHelper cluster2 = servers.get(1); |
| ColoHelper cluster3 = servers.get(2); |
| FileSystem cluster2FS = serverFS.get(1); |
| private final String baseTestDir = baseHDFSDir + "/UpdateAtSpecificTimeTest-data"; |
| String aggregateWorkflowDir = baseHDFSDir + "/aggregator"; |
| |
| @BeforeClass(alwaysRun = true) |
| public void uploadWorkflow() throws Exception { |
| uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); |
| } |
| |
| @BeforeMethod(alwaysRun = true) |
| public void setup(Method method) throws IOException { |
| logger.info("test name: " + method.getName()); |
| Bundle bundle = BundleUtil.readLocalDCBundle(); |
| bundles[0] = new Bundle(bundle, cluster1); |
| bundles[1] = new Bundle(bundle, cluster2); |
| bundles[2] = new Bundle(bundle, cluster3); |
| |
| bundles[0].generateUniqueBundle(); |
| bundles[1].generateUniqueBundle(); |
| bundles[2].generateUniqueBundle(); |
| |
| processBundle = BundleUtil.readELBundle(); |
| processBundle = new Bundle(processBundle, cluster1); |
| processBundle.generateUniqueBundle(); |
| processBundle.setProcessWorkflow(aggregateWorkflowDir); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| public void tearDown() { |
| removeBundles(); |
| removeBundles(processBundle); |
| } |
| |
| @Test(groups = {"singleCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true) |
| public void invalidChar_Process() |
| throws JAXBException, IOException, URISyntaxException, |
| AuthenticationException, OozieClientException { |
| processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(0), |
| TimeUtil.getTimeWrtSystemTime(20)); |
| processBundle.submitFeedsScheduleProcess(prism); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0); |
| String oldProcess = |
| processBundle.getProcessData(); |
| processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(5), |
| TimeUtil.getTimeWrtSystemTime(100)); |
| ServiceResponse r = prism.getProcessHelper().update(oldProcess, |
| processBundle.getProcessData(), "abc", null); |
| Assert.assertTrue(r.getMessage() |
| .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string")); |
| } |
| |
| @Test(groups = {"singleCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true) |
| public void invalidChar_Feed() |
| throws JAXBException, IOException, URISyntaxException, AuthenticationException, |
| OozieClientException { |
| |
| String feed = submitAndScheduleFeed(processBundle); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); |
| |
| //update frequency |
| Frequency f = new Frequency("" + 21, Frequency.TimeUnit.minutes); |
| String updatedFeed = InstanceUtil.setFeedFrequency(feed, f); |
| ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed, "abc", null); |
| Assert.assertTrue(r.getMessage() |
| .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string")); |
| } |
| |
| @Test(groups = {"singleCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true) |
| public void updateTimeInPast_Process() |
| throws JAXBException, IOException, URISyntaxException, |
| OozieClientException, AuthenticationException { |
| |
| processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(0), |
| TimeUtil.getTimeWrtSystemTime(20)); |
| processBundle.submitFeedsScheduleProcess(prism); |
| |
| //get old process details |
| String oldProcess = processBundle.getProcessData(); |
| String oldBundleId = InstanceUtil.getLatestBundleID(cluster1, |
| Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS); |
| |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, oldProcess, 0); |
| List<String> initialNominalTimes = OozieUtil.getActionsNominalTime(cluster1, |
| oldBundleId, EntityType.PROCESS); |
| |
| // update process by adding property |
| processBundle.setProcessProperty("someProp", "someValue"); |
| ServiceResponse r = prism.getProcessHelper().update(oldProcess, |
| processBundle.getProcessData(), TimeUtil.getTimeWrtSystemTime(-10000), null); |
| AssertUtil.assertSucceeded(r); |
| |
| //check new coord created with current time |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleId, initialNominalTimes, |
| processBundle.getProcessData(), true, false); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, oldProcess, 1); |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleId, initialNominalTimes, |
| processBundle.getProcessData(), true, true); |
| } |
| |
| @Test(groups = {"MultiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true) |
| public void updateTimeInPast_Feed() |
| throws JAXBException, IOException, OozieClientException, |
| URISyntaxException, AuthenticationException { |
| |
| String startTimeCluster_source = TimeUtil.getTimeWrtSystemTime(-10); |
| String startTimeCluster_target = TimeUtil.getTimeWrtSystemTime(10); |
| String feed = getMultiClusterFeed(startTimeCluster_source, startTimeCluster_target); |
| logger.info("feed: " + Util.prettyPrintXml(feed)); |
| |
| //submit and schedule feed |
| ServiceResponse r = prism.getFeedHelper().submitEntity(feed); |
| TimeUtil.sleepSeconds(10); |
| AssertUtil.assertSucceeded(r); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); |
| |
| //update frequency |
| Frequency f = new Frequency("" + 7, Frequency.TimeUnit.minutes); |
| String updatedFeed = InstanceUtil.setFeedFrequency(feed, f); |
| r = prism.getFeedHelper().update(feed, updatedFeed, |
| TimeUtil.getTimeWrtSystemTime(-10000), null); |
| AssertUtil.assertSucceeded(r); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1); |
| |
| //check correct number of coord exists or not |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster1.getFeedHelper(), |
| Util.readEntityName(feed), |
| "REPLICATION"), 2); |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), |
| "RETENTION"), 2); |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed), |
| "RETENTION"), 2); |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed), |
| "RETENTION"), 2); |
| } |
| |
| @Test(groups = {"MultiCluster", "0.3.1", "distributed"}, timeOut = 1200000, enabled = true) |
| public void inNextFewMinutesUpdate_RollForward_Process() |
| throws JAXBException, IOException, URISyntaxException, JSchException, |
| OozieClientException, SAXException, AuthenticationException { |
| /* |
| submit process on 3 clusters. Schedule on 2 clusters. Bring down one of |
| the scheduled cluster. Update with time 5 minutes from now. On running |
| cluster new coord should be created with start time +5 and no instance |
| should be missing. On 3rd cluster where process was only submit, |
| definition should be updated. Bring the down cluster up. Update with same |
| definition again, now the recently up cluster should also have new |
| coords. |
| */ |
| try { |
| Util.startService(cluster2.getProcessHelper()); |
| String startTime = TimeUtil.getTimeWrtSystemTime(-15); |
| processBundle.setProcessValidity(startTime, |
| TimeUtil.getTimeWrtSystemTime(60)); |
| processBundle.addClusterToBundle(bundles[1].getClusters().get(0), |
| ClusterType.SOURCE, null, null); |
| processBundle.addClusterToBundle(bundles[2].getClusters().get(0), |
| ClusterType.SOURCE, null, null); |
| processBundle.submitBundle(prism); |
| |
| //schedule of 2 cluster |
| cluster1.getProcessHelper().schedule(processBundle.getProcessData()); |
| cluster2.getProcessHelper().schedule(processBundle.getProcessData()); |
| InstanceUtil.waitTillInstancesAreCreated(cluster2, processBundle.getProcessData(), 0); |
| |
| //shut down cluster2 |
| Util.shutDownService(cluster2.getProcessHelper()); |
| |
| // save old data before update |
| String oldProcess = processBundle.getProcessData(); |
| String oldBundleID_cluster1 = InstanceUtil |
| .getLatestBundleID(cluster1, |
| Util.readEntityName(oldProcess), EntityType.PROCESS); |
| String oldBundleID_cluster2 = InstanceUtil |
| .getLatestBundleID(cluster2, |
| Util.readEntityName(oldProcess), EntityType.PROCESS); |
| List<String> oldNominalTimes_cluster1 = OozieUtil.getActionsNominalTime |
| (cluster1, |
| oldBundleID_cluster1, EntityType.PROCESS); |
| List<String> oldNominalTimes_cluster2 = OozieUtil.getActionsNominalTime |
| (cluster2, |
| oldBundleID_cluster2, EntityType.PROCESS); |
| |
| //update process validity |
| processBundle.setProcessProperty("someProp", "someValue"); |
| |
| //send update request |
| String updateTime = TimeUtil.getTimeWrtSystemTime(5); |
| ServiceResponse r = prism.getProcessHelper() |
| .update(oldProcess, processBundle.getProcessData(), updateTime); |
| AssertUtil.assertPartial(r); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1); |
| |
| //verify new bundle on cluster1 and definition on cluster3 |
| OozieUtil |
| .verifyNewBundleCreation(cluster1, oldBundleID_cluster1, oldNominalTimes_cluster1, |
| oldProcess, true, false); |
| OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2, |
| oldNominalTimes_cluster2, |
| oldProcess, false, false); |
| String definition_cluster_3 = Util.getEntityDefinition(cluster3, |
| processBundle.getProcessData(), true); |
| Assert.assertTrue(XmlUtil.isIdentical(definition_cluster_3, |
| processBundle.getProcessData()), "Process definitions should be equal"); |
| |
| //start the stopped cluster2 |
| Util.startService(cluster2.getProcessHelper()); |
| TimeUtil.sleepSeconds(40); |
| String newBundleID_cluster1 = InstanceUtil.getLatestBundleID(cluster1, |
| Util.readEntityName(oldProcess), EntityType.PROCESS); |
| |
| //send second update request |
| r = prism.getProcessHelper().update(oldProcess, processBundle.getProcessData(), |
| updateTime, null); |
| AssertUtil.assertSucceeded(r); |
| String def_cluster_2 = Util.getEntityDefinition(cluster2, |
| processBundle.getProcessData(), true); |
| logger.info("def_cluster_2 : " + Util.prettyPrintXml(def_cluster_2)); |
| |
| // verify new bundle in cluster2 and no new bundle in cluster1 and |
| OozieUtil.verifyNewBundleCreation(cluster1, newBundleID_cluster1, |
| oldNominalTimes_cluster1, oldProcess, false, false); |
| OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2, |
| oldNominalTimes_cluster2, oldProcess, true, false); |
| |
| //wait till update time is reached |
| TimeUtil.sleepTill(updateTime); |
| OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2, |
| oldNominalTimes_cluster2, oldProcess, true, true); |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID_cluster1, |
| oldNominalTimes_cluster1, oldProcess, true, true); |
| } finally { |
| Util.restartService(cluster2.getProcessHelper()); |
| } |
| } |
| |
| @Test(groups = {"MultiCluster", "0.3.1", "distributed"}, timeOut = 1200000, enabled = true) |
| public void inNextFewMinutesUpdate_RollForward_Feed() |
| throws JAXBException, IOException, URISyntaxException, JSchException, |
| OozieClientException, SAXException, AuthenticationException { |
| try { |
| String startTimeCluster_source = TimeUtil.getTimeWrtSystemTime(-18); |
| String feed = getMultiClusterFeed(startTimeCluster_source, startTimeCluster_source); |
| logger.info("feed: " + Util.prettyPrintXml(feed)); |
| |
| //submit feed on all 3 clusters |
| ServiceResponse r = prism.getFeedHelper().submitEntity(feed); |
| AssertUtil.assertSucceeded(r); |
| |
| //schedule feed of cluster1 and cluster2 |
| r = cluster1.getFeedHelper().schedule(feed); |
| AssertUtil.assertSucceeded(r); |
| r = cluster2.getFeedHelper().schedule(feed); |
| AssertUtil.assertSucceeded(r); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); |
| |
| //shutdown cluster2 |
| Util.shutDownService(cluster2.getProcessHelper()); |
| |
| //add some property to feed so that new bundle is created |
| String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal"); |
| |
| //save old data |
| String oldBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster1, |
| Util.readEntityName(feed), EntityType.FEED); |
| List<String> oldNominalTimes_cluster1 = OozieUtil.getActionsNominalTime |
| (cluster1, oldBundle_cluster1, EntityType.FEED); |
| |
| //send update command with +5 mins in future |
| String updateTime = TimeUtil.getTimeWrtSystemTime(5); |
| r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null); |
| AssertUtil.assertPartial(r); |
| |
| //verify new bundle creation on cluster1 and new definition on cluster3 |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundle_cluster1, |
| oldNominalTimes_cluster1, feed, true, false); |
| String definition = Util.getEntityDefinition(cluster3, feed, true); |
| Diff diff = XMLUnit.compareXML(definition, processBundle.getProcessData()); |
| logger.info(diff); |
| |
| //start stopped cluster2 |
| Util.startService(cluster2.getProcessHelper()); |
| String newBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster1, |
| Util.readEntityName(feed), EntityType.FEED); |
| |
| //send update again |
| r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null); |
| AssertUtil.assertSucceeded(r); |
| |
| //verify new bundle creation on cluster2 and no new bundle on cluster1 |
| Assert.assertEquals(InstanceUtil |
| .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), |
| "RETENTION"), 2); |
| OozieUtil.verifyNewBundleCreation(cluster1, newBundle_cluster1, |
| oldNominalTimes_cluster1, feed, false, false); |
| |
| //wait till update time is reached |
| TimeUtil.sleepTill(TimeUtil.getTimeWrtSystemTime(5)); |
| |
| //verify new bundle creation with instance matching |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundle_cluster1, |
| oldNominalTimes_cluster1, feed, true, true); |
| } finally { |
| Util.restartService(cluster2.getProcessHelper()); |
| } |
| } |
| |
| @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true) |
| public void updateTimeAfterEndTime_Process() |
| throws JAXBException, InterruptedException, IOException, URISyntaxException, |
| OozieClientException, AuthenticationException { |
| /* submit and schedule process with end time after 60 mins. Set update time |
| as with +60 from start mins */ |
| logger.info("Running test updateTimeAfterEndTime_Process"); |
| String startTime = TimeUtil.getTimeWrtSystemTime(-15); |
| String endTime = TimeUtil.getTimeWrtSystemTime(60); |
| processBundle.setProcessValidity(startTime, endTime); |
| processBundle.submitFeedsScheduleProcess(prism); |
| TimeUtil.sleepSeconds(10); |
| InstanceUtil.waitTillInstanceReachState(serverOC.get(0), |
| Util.readEntityName(processBundle.getProcessData()), 0, |
| CoordinatorAction.Status.WAITING, EntityType.PROCESS); |
| |
| //save old data |
| String oldProcess = processBundle.getProcessData(); |
| String oldBundleID = InstanceUtil |
| .getLatestBundleID(cluster1, |
| Util.readEntityName(oldProcess), EntityType.PROCESS); |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, oldBundleID, |
| EntityType.PROCESS); |
| |
| //update |
| processBundle.setProcessProperty("someProp", "someVal"); |
| String updateTime = TimeUtil.addMinsToTime(endTime, 60); |
| logger.info("Original Feed : " + Util.prettyPrintXml(oldProcess)); |
| logger.info("Updated Feed :" + Util.prettyPrintXml(processBundle.getProcessData())); |
| logger.info("Update Time : " + updateTime); |
| ServiceResponse r = prism.getProcessHelper().update(oldProcess, |
| processBundle.getProcessData(), updateTime, null); |
| AssertUtil.assertSucceeded(r); |
| |
| //verify new bundle creation with instances matching |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes, |
| oldProcess, true, false); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1); |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes, |
| oldProcess, true, true); |
| } |
| |
| @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true) |
| public void updateTimeAfterEndTime_Feed() |
| throws JAXBException, IOException, OozieClientException, |
| URISyntaxException, AuthenticationException { |
| |
| /* submit and schedule feed with end time 60 mins in future and update with +60 in future*/ |
| String startTime = TimeUtil.getTimeWrtSystemTime(-15); |
| String endTime = TimeUtil.getTimeWrtSystemTime(60); |
| |
| String feed = processBundle.getDataSets().get(0); |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"), |
| XmlUtil.createRtention("days(100000)", ActionType.DELETE), null, |
| ClusterType.SOURCE, null); |
| feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime, endTime), |
| XmlUtil.createRtention("days(100000)", ActionType.DELETE), |
| Util.readEntityName(processBundle.getClusters().get(0)), ClusterType.SOURCE, |
| null, baseTestDir + "/replication" + MINUTE_DATE_PATTERN); |
| |
| ServiceResponse r = prism.getClusterHelper().submitEntity( |
| processBundle.getClusters().get(0)); |
| AssertUtil.assertSucceeded(r); |
| r = prism.getFeedHelper().submitAndSchedule(feed); |
| AssertUtil.assertSucceeded(r); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); |
| |
| //save old data |
| String oldBundleID = InstanceUtil.getLatestBundleID(cluster1, |
| Util.readEntityName(feed), EntityType.FEED); |
| String updateTime = TimeUtil.addMinsToTime(endTime, 60); |
| String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal"); |
| logger.info("Original Feed : " + Util.prettyPrintXml(feed)); |
| logger.info("Updated Feed :" + Util.prettyPrintXml(updatedFeed)); |
| logger.info("Update Time : " + updateTime); |
| r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null); |
| AssertUtil.assertSucceeded(r); |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1); |
| |
| //verify new bundle creation |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, null, feed, true, false); |
| } |
| |
| @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true) |
| public void updateTimeBeforeStartTime_Process() throws JAXBException, IOException, |
| URISyntaxException, OozieClientException, AuthenticationException { |
| |
| /* submit and schedule process with start time +10 mins from now. Update with start time |
| -4 and update time +2 mins */ |
| String startTime = TimeUtil.getTimeWrtSystemTime(10); |
| String endTime = TimeUtil.getTimeWrtSystemTime(20); |
| processBundle.setProcessValidity(startTime, endTime); |
| processBundle.submitFeedsScheduleProcess(prism); |
| |
| //save old data |
| String oldProcess = processBundle.getProcessData(); |
| String oldBundleID = InstanceUtil.getLatestBundleID(cluster1, |
| Util.readEntityName(oldProcess), EntityType.PROCESS); |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, oldBundleID, |
| EntityType.PROCESS); |
| processBundle.setProcessValidity(TimeUtil.addMinsToTime(startTime, -4), endTime); |
| String updateTime = TimeUtil.getTimeWrtSystemTime(2); |
| ServiceResponse r = prism.getProcessHelper().update(oldProcess, |
| processBundle.getProcessData(), updateTime, null); |
| AssertUtil.assertSucceeded(r); |
| TimeUtil.sleepSeconds(10); |
| |
| //verify new bundle creation |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes, |
| oldProcess, true, false); |
| } |
| |
| @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000, enabled = true) |
| public void updateDiffClusterDiffValidity_Process() |
| throws JAXBException, IOException, URISyntaxException, OozieClientException, |
| AuthenticationException { |
| |
| //set start end process time for 3 clusters |
| String startTime_cluster1 = TimeUtil.getTimeWrtSystemTime(-40); |
| String endTime_cluster1 = TimeUtil.getTimeWrtSystemTime(3); |
| String startTime_cluster2 = TimeUtil.getTimeWrtSystemTime(120); |
| String endTime_cluster2 = TimeUtil.getTimeWrtSystemTime(240); |
| String startTime_cluster3 = TimeUtil.getTimeWrtSystemTime(-30); |
| String endTime_cluster3 = TimeUtil.getTimeWrtSystemTime(180); |
| |
| //create multi cluster bundle |
| processBundle.setProcessValidity(startTime_cluster1, endTime_cluster1); |
| processBundle.addClusterToBundle(bundles[1].getClusters().get(0), |
| ClusterType.SOURCE, startTime_cluster2, endTime_cluster2); |
| processBundle.addClusterToBundle(bundles[2].getClusters().get(0), |
| ClusterType.SOURCE, startTime_cluster3, endTime_cluster3); |
| |
| //submit and schedule |
| processBundle.submitFeedsScheduleProcess(prism); |
| |
| //wait for coord to be in running state |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 0); |
| |
| //save old info |
| String oldBundleID_cluster1 = InstanceUtil.getLatestBundleID(cluster1, |
| Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS); |
| List<String> nominalTimes_cluster1 = OozieUtil.getActionsNominalTime(cluster1, |
| oldBundleID_cluster1, EntityType.PROCESS); |
| String oldBundleID_cluster2 = InstanceUtil.getLatestBundleID(cluster2, |
| Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS); |
| String oldBundleID_cluster3 = InstanceUtil.getLatestBundleID(cluster3, |
| Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS); |
| List<String> nominalTimes_cluster3 = OozieUtil.getActionsNominalTime(cluster3, |
| oldBundleID_cluster3, EntityType.PROCESS); |
| |
| //update process |
| String updateTime = TimeUtil.addMinsToTime(endTime_cluster1, 3); |
| processBundle.setProcessProperty("someProp", "someVal"); |
| ServiceResponse r = prism.getProcessHelper().update(processBundle.getProcessData(), |
| processBundle.getProcessData(), updateTime, null); |
| AssertUtil.assertSucceeded(r); |
| |
| //check for new bundle to be created |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID_cluster1, |
| nominalTimes_cluster1, processBundle.getProcessData(), true, false); |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleID_cluster3, |
| nominalTimes_cluster3, processBundle.getProcessData(), true, false); |
| OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2, |
| nominalTimes_cluster3, processBundle.getProcessData(), true, false); |
| |
| //wait till new coord are running on cluster1 |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1); |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID_cluster1, |
| nominalTimes_cluster1, processBundle.getProcessData(), true, true); |
| |
| //verify |
| String coordStartTime_cluster3 = OozieUtil.getCoordStartTime(cluster3, |
| processBundle.getProcessData(), 1); |
| String coordStartTime_cluster2 = OozieUtil.getCoordStartTime(cluster2, |
| processBundle.getProcessData(), 1); |
| |
| DateTime updateTimeOozie = TimeUtil.oozieDateToDate(updateTime); |
| Assert.assertTrue(TimeUtil.oozieDateToDate(coordStartTime_cluster3).isAfter(updateTimeOozie) |
| || TimeUtil.oozieDateToDate(coordStartTime_cluster3).isEqual(updateTimeOozie), |
| "new coord start time is not correct"); |
| Assert.assertFalse( |
| TimeUtil.oozieDateToDate(coordStartTime_cluster2).isEqual(updateTimeOozie), |
| "new coord start time is not correct"); |
| TimeUtil.sleepTill(updateTime); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 1); |
| |
| //verify that no instance are missing |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleID_cluster3, |
| nominalTimes_cluster3, processBundle.getProcessData(), true, true); |
| } |
| |
| private String submitAndScheduleFeed(Bundle b) |
| throws JAXBException, IOException, URISyntaxException, AuthenticationException { |
| String feed = b.getDataSets().get(0); |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null, |
| ClusterType.SOURCE, null); |
| feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity |
| ("2012-10-01T12:10Z", "2099-10-01T12:10Z"), |
| XmlUtil.createRtention("days(1000000)", ActionType.DELETE), |
| Util.readEntityName(b.getClusters().get(0)), ClusterType.SOURCE, "", |
| "/someTestPath" + MINUTE_DATE_PATTERN); |
| ServiceResponse r = prism.getClusterHelper().submitEntity( |
| b.getClusters().get(0)); |
| AssertUtil.assertSucceeded(r); |
| r = prism.getFeedHelper().submitAndSchedule(feed); |
| AssertUtil.assertSucceeded(r); |
| return feed; |
| } |
| |
| private String getMultiClusterFeed(String startTimeCluster_source, |
| String startTimeCluster_target) |
| throws IOException, URISyntaxException, AuthenticationException { |
| String testDataDir = baseTestDir + "/replication"; |
| |
| //create desired feed |
| String feed = bundles[0].getDataSets().get(0); |
| |
| //cluster1 is target, cluster2 is source and cluster3 is neutral |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"), |
| XmlUtil.createRtention("days(100000)", ActionType.DELETE), null, |
| ClusterType.SOURCE, null); |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTimeCluster_source, "2099-10-01T12:10Z"), |
| XmlUtil.createRtention("days(100000)", ActionType.DELETE), |
| Util.readEntityName(bundles[2].getClusters().get(0)), null, null); |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTimeCluster_target, "2099-10-01T12:25Z"), |
| XmlUtil.createRtention("days(100000)", ActionType.DELETE), |
| Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, |
| null, testDataDir + MINUTE_DATE_PATTERN); |
| feed = InstanceUtil.setFeedCluster(feed, |
| XmlUtil.createValidity(startTimeCluster_source, "2099-01-01T00:00Z"), |
| XmlUtil.createRtention("days(100000)", ActionType.DELETE), |
| Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE, |
| null, testDataDir + MINUTE_DATE_PATTERN); |
| |
| //submit clusters |
| Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); |
| |
| //create test data on cluster2 |
| List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startTimeCluster_source, |
| TimeUtil.getTimeWrtSystemTime(60), 1); |
| HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE, |
| testDataDir + "/", dataDates); |
| return feed; |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| public void tearDown(Method method) { |
| logger.info("tearDown " + method.getName()); |
| processBundle.deleteBundle(prism); |
| bundles[0].deleteBundle(prism); |
| processBundle.deleteBundle(prism); |
| } |
| |
| @AfterClass(alwaysRun = true) |
| public void tearDownClass() throws IOException { |
| cleanTestDirs(); |
| } |
| } |