| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.falcon.regression.prism; |
| |
| import org.apache.falcon.entity.v0.EntityType; |
| import org.apache.falcon.entity.v0.Frequency; |
| import org.apache.falcon.entity.v0.Frequency.TimeUnit; |
| import org.apache.falcon.entity.v0.feed.ClusterType; |
| import org.apache.falcon.entity.v0.process.ExecutionType; |
| import org.apache.falcon.regression.Entities.ProcessMerlin; |
| import org.apache.falcon.regression.core.bundle.Bundle; |
| import org.apache.falcon.regression.core.helpers.ColoHelper; |
| import org.apache.falcon.regression.core.response.APIResult; |
| import org.apache.falcon.regression.core.response.ServiceResponse; |
| import org.apache.falcon.regression.core.supportClasses.HadoopFileEditor; |
| import org.apache.falcon.regression.core.util.AssertUtil; |
| import org.apache.falcon.regression.core.util.BundleUtil; |
| import org.apache.falcon.regression.core.util.HadoopUtil; |
| import org.apache.falcon.regression.core.util.InstanceUtil; |
| import org.apache.falcon.regression.core.util.OSUtil; |
| import org.apache.falcon.regression.core.util.OozieUtil; |
| import org.apache.falcon.regression.core.util.TimeUtil; |
| import org.apache.falcon.regression.core.util.Util; |
| import org.apache.falcon.regression.core.util.XmlUtil; |
| import org.apache.falcon.regression.testHelper.BaseTestClass; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.security.authentication.client.AuthenticationException; |
| import org.apache.log4j.Logger; |
| import org.apache.oozie.client.BundleJob; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.CoordinatorJob; |
| import org.apache.oozie.client.Job; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.OozieClientException; |
| import org.joda.time.DateTime; |
| import org.joda.time.DateTimeZone; |
| import org.joda.time.Minutes; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import javax.xml.bind.JAXBException; |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.net.URISyntaxException; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Random; |
| |
| /** |
| * test for process update. |
| */ |
| @Test(groups = "distributed") |
| public class NewPrismProcessUpdateTest extends BaseTestClass { |
| |
| private String baseTestDir = baseHDFSDir + "/NewPrismProcessUpdateTest"; |
| private String inputFeedPath = baseTestDir + MINUTE_DATE_PATTERN; |
| private String workflowPath = baseTestDir + "/falcon-oozie-wf"; |
| private String workflowPath2 = baseTestDir + "/falcon-oozie-wf2"; |
| private String aggregatorPath = baseTestDir + "/aggregator"; |
| private String aggregator1Path = baseTestDir + "/aggregator1"; |
| private ColoHelper cluster1 = servers.get(0); |
| private ColoHelper cluster2 = servers.get(1); |
| private ColoHelper cluster3 = servers.get(2); |
| private FileSystem cluster1FS = serverFS.get(0); |
| private OozieClient cluster2OC = serverOC.get(1); |
| private OozieClient cluster3OC = serverOC.get(2); |
| private static final Logger LOGGER = Logger.getLogger(NewPrismProcessUpdateTest.class); |
| |
| @BeforeMethod(alwaysRun = true) |
| public void testSetup(Method method) throws Exception { |
| LOGGER.info("test name: " + method.getName()); |
| Bundle b = BundleUtil.readUpdateBundle(baseAppHDFSDir, this.getClass().getSimpleName()); |
| bundles[0] = new Bundle(b, cluster1); |
| bundles[0].generateUniqueBundle(); |
| bundles[1] = new Bundle(b, cluster2); |
| bundles[1].generateUniqueBundle(); |
| bundles[2] = new Bundle(b, cluster3); |
| bundles[2].generateUniqueBundle(); |
| setBundleWFPath(bundles[0], bundles[1], bundles[2]); |
| bundles[1].addClusterToBundle(bundles[2].getClusters().get(0), |
| ClusterType.TARGET, null, null); |
| usualGrind(bundles[1]); |
| Util.restartService(cluster3.getClusterHelper()); |
| } |
| |
| @BeforeClass(alwaysRun = true) |
| public void setup() throws Exception { |
| for (String wfPath : new String[] { workflowPath, workflowPath2, aggregatorPath, aggregator1Path }) { |
| uploadDirToClusters(wfPath, OSUtil.RESOURCES_OOZIE); |
| } |
| Util.restartService(cluster3.getClusterHelper()); |
| Util.restartService(cluster1.getClusterHelper()); |
| Util.restartService(cluster2.getClusterHelper()); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| public void tearDown() { |
| removeBundles(); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessFrequencyInEachColoWithOneProcessRunningMonthly() |
| throws Exception { |
| final String startTIme = TimeUtil.getTimeWrtSystemTime(-20); |
| String endTime = TimeUtil.getTimeWrtSystemTime(4000 * 60); |
| bundles[1].setProcessPeriodicity(1, TimeUnit.months); |
| bundles[1].setOutputFeedPeriodicity(1, TimeUnit.months); |
| bundles[1].setProcessValidity(startTIme, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| String updatedProcess = InstanceUtil |
| .setProcessFrequency(bundles[1].getProcessData(), |
| new Frequency("" + 5, TimeUnit.minutes)); |
| |
| LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess)); |
| |
| //now to update |
| while (Util |
| .parseResponse(prism.getProcessHelper() |
| .update((bundles[1].getProcessData()), updatedProcess)) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| LOGGER.info("update didn't SUCCEED in last attempt"); |
| TimeUtil.sleepSeconds(10); |
| } |
| |
| String prismString = getResponse(prism, bundles[1].getProcessData(), true); |
| Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), |
| Util.getProcessObject(updatedProcess).getFrequency()); |
| TimeUtil.sleepSeconds(60); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, false); |
| waitingForBundleFinish(cluster3, oldBundleId, 5); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, true); |
| |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| //failing due to falcon bug : https://issues.apache.org/jira/browse/FALCON-458 |
| public void updateProcessRollStartTimeForwardInEachColoWithOneProcessRunning() |
| throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(3); |
| String endTime = TimeUtil.getTimeWrtSystemTime(7); |
| bundles[1].setProcessValidity(startTime, endTime); |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| List<String> oldNominalTimes = |
| OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); |
| |
| String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart() |
| ), 20); |
| String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart() |
| ), 25); |
| |
| bundles[1].setProcessValidity(newStartTime, newEndTime); |
| bundles[1].setProcessConcurrency(10); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| LOGGER.info("updated process: " + Util.prettyPrintXml(bundles[1].getProcessData())); |
| while (Util.parseResponse( |
| prism.getProcessHelper() |
| .update(bundles[1].getProcessData(), Util.prettyPrintXml(bundles[1] |
| .getProcessData()))) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| LOGGER.info("update didn't SUCCEED in last attempt"); |
| TimeUtil.sleepSeconds(10); |
| } |
| |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, false); |
| |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| |
| waitingForBundleFinish(cluster3, oldBundleId, 15); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| int finalNumberOfInstances = |
| InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); |
| Assert.assertEquals(finalNumberOfInstances, |
| getExpectedNumberOfWorkflowInstances(TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getStart()), |
| TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters() |
| .get(0).getValidity() |
| .getEnd()))); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| int expectedNumberOfWorkflows = |
| getExpectedNumberOfWorkflowInstances(newStartTime, TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getEnd())); |
| Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId), |
| expectedNumberOfWorkflows); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1800000) |
| public void updateProcessConcurrencyWorkflowExecutionInEachColoWithOneColoDown() |
| throws Exception { |
| //bundles[1].generateUniqueBundle(); |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| TimeUtil.sleepSeconds(25); |
| |
| int initialConcurrency = bundles[1].getProcessObject().getParallel(); |
| |
| bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3); |
| bundles[1].setProcessWorkflow(workflowPath2); |
| bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1])); |
| |
| //stop cluster 3 where process is scheduled |
| Util.shutDownService(cluster3.getProcessHelper()); |
| |
| //now to update |
| AssertUtil.assertPartial( |
| prism.getProcessHelper() |
| .update(bundles[1].getProcessData(), bundles[1].getProcessData())); |
| String prismString = getResponse(prism, bundles[1].getProcessData(), true); |
| Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), |
| initialConcurrency); |
| Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(), |
| workflowPath); |
| Assert.assertEquals(Util.getProcessObject(prismString).getOrder(), |
| bundles[1].getProcessObject().getOrder()); |
| |
| String coloString = getResponse(cluster2, bundles[1].getProcessData(), true); |
| Assert.assertEquals(Util.getProcessObject(coloString).getWorkflow().getPath(), |
| workflowPath2); |
| |
| Util.startService(cluster3.getProcessHelper()); |
| dualComparisonFailure(prism, cluster2, bundles[1].getProcessData()); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| AssertUtil |
| .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| while (Util.parseResponse( |
| prism.getProcessHelper() |
| .update(bundles[1].getProcessData(), bundles[1].getProcessData())) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| LOGGER.info("WARNING: update did not succeed, retrying "); |
| TimeUtil.sleepSeconds(20); |
| } |
| prismString = getResponse(prism, bundles[1].getProcessData(), true); |
| Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), |
| initialConcurrency + 3); |
| Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(), |
| workflowPath2); |
| Assert.assertEquals(Util.getProcessObject(prismString).getOrder(), |
| bundles[1].getProcessObject().getOrder()); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| AssertUtil |
| .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| waitingForBundleFinish(cluster3, oldBundleId); |
| int finalNumberOfInstances = |
| InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); |
| |
| int expectedInstances = |
| getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getStart()), |
| TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters() |
| .get(0).getValidity() |
| .getEnd())); |
| Assert.assertEquals(finalNumberOfInstances, expectedInstances, |
| "number of instances doesnt match :("); |
| |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessFrequencyInEachColoWithOneProcessRunning() throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(-2); |
| String endTime = TimeUtil.getTimeWrtSystemTime(20); |
| bundles[1].setProcessValidity(startTime, endTime); |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData())); |
| |
| String updatedProcess = InstanceUtil |
| .setProcessFrequency(bundles[1].getProcessData(), |
| new Frequency("" + 7, TimeUnit.minutes)); |
| |
| LOGGER.info("updated process: " + updatedProcess); |
| |
| //now to update |
| |
| ServiceResponse response = |
| prism.getProcessHelper().update(updatedProcess, updatedProcess); |
| AssertUtil.assertSucceeded(response); |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, false); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); |
| |
| String prismString = getResponse(prism, bundles[1].getProcessData(), true); |
| Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), |
| Util.getProcessObject(updatedProcess).getFrequency()); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessNameInEachColoWithOneProcessRunning() throws Exception { |
| //bundles[1].generateUniqueBundle(); |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| String originalProcessData = bundles[1].getProcessData(); |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| TimeUtil.sleepSeconds(20); |
| List<String> oldNominalTimes = |
| OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); |
| bundles[1].setProcessName("myNewProcessName"); |
| |
| //now to update |
| ServiceResponse response = |
| prism.getProcessHelper() |
| .update((bundles[1].getProcessData()), bundles[1].getProcessData()); |
| AssertUtil.assertFailed(response); |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| originalProcessData, false, false); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessConcurrencyInEachColoWithOneProcessRunning() |
| throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(-2); |
| String endTime = TimeUtil.getTimeWrtSystemTime(10); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| //bundles[1].generateUniqueBundle(); |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| //now to update |
| DateTime updateTime = new DateTime(DateTimeZone.UTC); |
| TimeUtil.sleepSeconds(60); |
| List<String> oldNominalTimes = |
| OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); |
| LOGGER.info("updating at " + updateTime); |
| while (Util |
| .parseResponse(updateProcessConcurrency(bundles[1], |
| bundles[1].getProcessObject().getParallel() + 3)) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| LOGGER.info("WARNING: update did not scceed, retyring "); |
| TimeUtil.sleepSeconds(20); |
| } |
| |
| String prismString = getResponse(prism, bundles[1].getProcessData(), true); |
| Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), |
| bundles[1].getProcessObject().getParallel() + 3); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated |
| // correctly. |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), |
| false, true); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| |
| // future : should be verified using cord xml |
| Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| boolean doesExist = false; |
| OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0); |
| while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED |
| && |
| status != Job.Status.DONEWITHERROR) { |
| int statusCount = InstanceUtil |
| .getInstanceCountWithStatus(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), |
| org.apache.oozie.client.CoordinatorAction.Status.RUNNING, |
| EntityType.PROCESS); |
| if (statusCount == bundles[1].getProcessObject().getParallel() + 3) { |
| doesExist = true; |
| break; |
| } |
| status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| Assert.assertNotNull(status, |
| "status must not be null!"); |
| TimeUtil.sleepSeconds(30); |
| } |
| |
| Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!"); |
| int expectedNumberOfInstances = |
| getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getStart()), |
| TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters() |
| .get(0).getValidity() |
| .getEnd())); |
| Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId), |
| expectedNumberOfInstances); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessIncreaseValidityInEachColoWithOneProcessRunning() throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(3); |
| String endTime = TimeUtil.getTimeWrtSystemTime(8); |
| bundles[1].setProcessValidity(startTime, endTime); |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getEnd() |
| ), 4); |
| bundles[1].setProcessValidity(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart()), |
| newEndTime); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| ServiceResponse response = prism.getProcessHelper() |
| .update(bundles[1].getProcessData(), bundles[1].getProcessData()); |
| for (int i = 0; i < 10 |
| && |
| Util.parseResponse(response).getStatus() != APIResult.Status.SUCCEEDED; ++i) { |
| response = prism.getProcessHelper() |
| .update(bundles[1].getProcessData(), bundles[1].getProcessData()); |
| TimeUtil.sleepSeconds(6); |
| } |
| Assert.assertEquals(Util.parseResponse(response).getStatus(), |
| APIResult.Status.SUCCEEDED, "Process update did not succeed."); |
| |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), false, true); |
| |
| int i = 0; |
| |
| while (OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId) |
| != getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart() |
| ), TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity() |
| .getEnd())) |
| && i < 10) { |
| TimeUtil.sleepSeconds(1); |
| i++; |
| } |
| |
| bundles[1].verifyDependencyListing(cluster2); |
| |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| waitingForBundleFinish(cluster3, oldBundleId); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated |
| // correctly. |
| int finalNumberOfInstances = InstanceUtil |
| .getProcessInstanceList(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS) |
| .size(); |
| Assert.assertEquals(finalNumberOfInstances, |
| getExpectedNumberOfWorkflowInstances(TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getStart()), |
| TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters() |
| .get(0).getValidity() |
| .getEnd()))); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessConcurrencyInEachColoWithOneProcessSuspended() |
| throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(3); |
| String endTime = TimeUtil.getTimeWrtSystemTime(7); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().suspend(bundles[1].getProcessData())); |
| //now to update |
| while (Util |
| .parseResponse(updateProcessConcurrency(bundles[1], |
| bundles[1].getProcessObject().getParallel() + 3)) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| LOGGER.info("WARNING: update did not scceed, retyring "); |
| TimeUtil.sleepSeconds(20); |
| } |
| |
| String prismString = getResponse(prism, bundles[1].getProcessData(), true); |
| Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), |
| bundles[1].getProcessObject().getParallel() + 3); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), false, true); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); |
| AssertUtil.checkStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| |
| Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| boolean doesExist = false; |
| OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0); |
| while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED |
| && |
| status != Job.Status.DONEWITHERROR) { |
| if (InstanceUtil |
| .getInstanceCountWithStatus(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), |
| org.apache.oozie.client.CoordinatorAction.Status.RUNNING, |
| EntityType.PROCESS) |
| == |
| bundles[1].getProcessObject().getParallel()) { |
| doesExist = true; |
| break; |
| } |
| status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| } |
| |
| Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!"); |
| waitingForBundleFinish(cluster3, oldBundleId); |
| |
| int finalNumberOfInstances = |
| InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); |
| |
| int expectedInstances = |
| getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getStart()), |
| TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters() |
| .get(0).getValidity() |
| .getEnd())); |
| |
| Assert.assertEquals(finalNumberOfInstances, expectedInstances, |
| "number of instances doesnt match :("); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessConcurrencyInEachColoWithOneColoDown() throws Exception { |
| |
| String startTime = TimeUtil.getTimeWrtSystemTime(-1); |
| String endTime = TimeUtil.getTimeWrtSystemTime(5); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| |
| LOGGER.info("process to be scheduled: " + Util.prettyPrintXml(bundles[1].getProcessData())); |
| |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| //now to update |
| Util.shutDownService(cluster3.getClusterHelper()); |
| |
| ServiceResponse response = |
| updateProcessConcurrency(bundles[1], |
| bundles[1].getProcessObject().getParallel() + 3); |
| AssertUtil.assertPartial(response); |
| |
| Util.startService(cluster3.getClusterHelper()); |
| |
| String prismString = getResponse(prism, bundles[1].getProcessData(), true); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), |
| bundles[1].getProcessObject().getParallel()); |
| |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], |
| Job.Status.RUNNING); |
| |
| while (Util |
| .parseResponse(updateProcessConcurrency(bundles[1], |
| bundles[1].getProcessObject().getParallel() + 3)) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| LOGGER.info("WARNING: update did not scceed, retyring "); |
| TimeUtil.sleepSeconds(20); |
| } |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| dualComparison(prism, cluster2, bundles[1].getProcessData()); |
| |
| Job.Status status = |
| OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| boolean doesExist = false; |
| OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0); |
| while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED |
| && |
| status != Job.Status.DONEWITHERROR) { |
| if (InstanceUtil |
| .getInstanceCountWithStatus(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), |
| org.apache.oozie.client.CoordinatorAction.Status.RUNNING, |
| EntityType.PROCESS) |
| == |
| bundles[1].getProcessObject().getParallel() + 3) { |
| doesExist = true; |
| break; |
| } |
| status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| TimeUtil.sleepSeconds(30); |
| } |
| Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!"); |
| OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS), |
| oldNominalTimes, bundles[1].getProcessData(), false, |
| true |
| ); |
| |
| waitingForBundleFinish(cluster3, oldBundleId); |
| |
| int finalNumberOfInstances = |
| InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); |
| |
| int expectedInstances = |
| getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getStart()), |
| TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters() |
| .get(0).getValidity() |
| .getEnd())); |
| Assert.assertEquals(finalNumberOfInstances, expectedInstances, |
| "number of instances doesnt match :("); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessConcurrencyExecutionWorkflowInEachColoWithOneProcessRunning() |
| throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(-1); |
| String endTime = TimeUtil.getTimeWrtSystemTime(7); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| int initialConcurrency = bundles[1].getProcessObject().getParallel(); |
| |
| bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3); |
| bundles[1].setProcessWorkflow(aggregator1Path); |
| bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1])); |
| |
| //now to update |
| |
| String updateTime = new DateTime(DateTimeZone.UTC).plusMinutes(2).toString(); |
| |
| LOGGER.info("updating @ " + updateTime); |
| |
| while (Util.parseResponse( |
| prism.getProcessHelper().update((bundles[1].getProcessData()), bundles[1] |
| .getProcessData())).getStatus() != APIResult.Status.SUCCEEDED) { |
| TimeUtil.sleepSeconds(10); |
| } |
| String prismString = getResponse(prism, bundles[1].getProcessData(), true); |
| Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), |
| initialConcurrency + 3); |
| Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(), |
| aggregator1Path); |
| Assert.assertEquals(Util.getProcessObject(prismString).getOrder(), |
| bundles[1].getProcessObject().getOrder()); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| waitingForBundleFinish(cluster3, oldBundleId); |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, true); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| int finalNumberOfInstances = |
| InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); |
| int expectedInstances = |
| getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getStart()), |
| TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters() |
| .get(0).getValidity() |
| .getEnd())); |
| Assert.assertEquals(finalNumberOfInstances, expectedInstances, |
| "number of instances doesnt match :("); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessConcurrencyExecutionWorkflowInEachColoWithOneProcessSuspended() |
| throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(2); |
| String endTime = TimeUtil.getTimeWrtSystemTime(6); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| int initialConcurrency = bundles[1].getProcessObject().getParallel(); |
| |
| bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3); |
| bundles[1].setProcessWorkflow(aggregator1Path); |
| bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1])); |
| //suspend |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().suspend(bundles[1].getProcessData())); |
| |
| //now to update |
| String updateTime = new DateTime(DateTimeZone.UTC).plusMinutes(2).toString(); |
| LOGGER.info("updating @ " + updateTime); |
| while (Util.parseResponse( |
| prism.getProcessHelper() |
| .update((bundles[1].getProcessData()), bundles[1].getProcessData())) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| TimeUtil.sleepSeconds(10); |
| } |
| |
| AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); |
| |
| String prismString = getResponse(prism, bundles[1].getProcessData(), true); |
| Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), |
| initialConcurrency + 3); |
| Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(), |
| aggregator1Path); |
| Assert.assertEquals(Util.getProcessObject(prismString).getOrder(), |
| bundles[1].getProcessObject().getOrder()); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| waitingForBundleFinish(cluster3, oldBundleId); |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, true); |
| AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| int finalNumberOfInstances = |
| InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); |
| |
| int expectedInstances = |
| getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getStart()), |
| TimeUtil |
| .dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters() |
| .get(0).getValidity() |
| .getEnd())); |
| Assert.assertEquals(finalNumberOfInstances, expectedInstances, |
| "number of instances doesnt match :("); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessAddNewInputInEachColoWithOneProcessRunning() throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(-1); |
| String endTime = TimeUtil.getTimeWrtSystemTime(6); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| TimeUtil.sleepSeconds(20); |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2"; |
| String inputFeed = bundles[1].getInputFeedFromBundle(); |
| |
| bundles[1].addProcessInput(newFeedName, "inputData2"); |
| inputFeed = Util.setFeedName(inputFeed, newFeedName); |
| |
| LOGGER.info(inputFeed); |
| AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed)); |
| |
| while (Util.parseResponse( |
| prism.getProcessHelper() |
| .update((bundles[1].getProcessData()), bundles[1].getProcessData())) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| TimeUtil.sleepSeconds(20); |
| } |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, false); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); |
| |
| bundles[1].verifyDependencyListing(cluster2); |
| |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| waitingForBundleFinish(cluster3, oldBundleId); |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, true); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessAddNewInputInEachColoWithOneProcessSuspended() throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(1); |
| String endTime = TimeUtil.getTimeWrtSystemTime(6); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2"; |
| String inputFeed = bundles[1].getInputFeedFromBundle(); |
| |
| bundles[1].addProcessInput(newFeedName, "inputData2"); |
| inputFeed = Util.setFeedName(inputFeed, newFeedName); |
| |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().suspend(bundles[1].getProcessData())); |
| AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed)); |
| |
| while (Util.parseResponse( |
| prism.getProcessHelper() |
| .update((bundles[1].getProcessData()), bundles[1].getProcessData())) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| TimeUtil.sleepSeconds(10); |
| } |
| |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, false); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); |
| |
| AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); |
| |
| bundles[1].verifyDependencyListing(cluster2); |
| |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| waitingForBundleFinish(cluster3, oldBundleId); |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, true); |
| AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessAddNewInputInEachColoWithOneColoDown() throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(3); |
| String endTime = TimeUtil.getTimeWrtSystemTime(10); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| String originalProcess = bundles[1].getProcessData(); |
| String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2"; |
| String inputFeed = bundles[1].getInputFeedFromBundle(); |
| bundles[1].addProcessInput(newFeedName, "inputData2"); |
| inputFeed = Util.setFeedName(inputFeed, newFeedName); |
| String updatedProcess = bundles[1].getProcessData(); |
| |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(originalProcess)); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(originalProcess), EntityType.PROCESS); |
| |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10); |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| //submit new feed |
| AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed)); |
| |
| Util.shutDownService(cluster3.getProcessHelper()); |
| |
| AssertUtil.assertPartial( |
| prism.getProcessHelper() |
| .update(updatedProcess, updatedProcess)); |
| |
| Util.startService(cluster3.getProcessHelper()); |
| bundles[1].verifyDependencyListing(cluster2); |
| |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| Assert.assertFalse(Util.isDefinitionSame(cluster2, prism, originalProcess)); |
| |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), false, false); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], |
| Job.Status.RUNNING); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| while (Util.parseResponse( |
| prism.getProcessHelper().update(updatedProcess, updatedProcess)).getStatus() |
| != APIResult.Status.SUCCEEDED) { |
| LOGGER.info("update didnt SUCCEED in last attempt"); |
| TimeUtil.sleepSeconds(10); |
| } |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| Assert.assertTrue(Util.isDefinitionSame(cluster2, prism, originalProcess)); |
| bundles[1].verifyDependencyListing(cluster2); |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| updatedProcess, true, false); |
| waitingForBundleFinish(cluster3, oldBundleId); |
| |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); |
| |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, true); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], |
| Job.Status.RUNNING); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessDecreaseValidityInEachColoWithOneProcessRunning() throws Exception { |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getEnd() |
| ), -2); |
| bundles[1].setProcessValidity(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart()), |
| newEndTime); |
| while (Util.parseResponse( |
| (prism.getProcessHelper() |
| .update(bundles[1].getProcessData(), bundles[1].getProcessData()))) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| LOGGER.info("update didnt SUCCEED in last attempt"); |
| TimeUtil.sleepSeconds(10); |
| } |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), false, true); |
| |
| bundles[1].verifyDependencyListing(cluster2); |
| |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| waitingForBundleFinish(cluster3, oldBundleId); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| int finalNumberOfInstances = InstanceUtil |
| .getProcessInstanceList(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS) |
| .size(); |
| Assert.assertEquals(finalNumberOfInstances, |
| getExpectedNumberOfWorkflowInstances(bundles[1] |
| .getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart(), |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getEnd())); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| int expectedNumberOfWorkflows = |
| getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getStart()), |
| newEndTime); |
| Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId), |
| expectedNumberOfWorkflows); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessIncreaseValidityInEachColoWithOneProcessSuspended() throws Exception { |
| String startTime = TimeUtil.getTimeWrtSystemTime(-1); |
| String endTime = TimeUtil.getTimeWrtSystemTime(3); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| TimeUtil.sleepSeconds(30); |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getEnd() |
| ), 4); |
| bundles[1].setProcessValidity(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart()), |
| newEndTime); |
| |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().suspend(bundles[1].getProcessData())); |
| while (Util.parseResponse( |
| prism.getProcessHelper() |
| .update((bundles[1].getProcessData()), bundles[1].getProcessData())) |
| .getStatus() != APIResult.Status.SUCCEEDED) { |
| LOGGER.info("update didnt SUCCEED in last attempt"); |
| TimeUtil.sleepSeconds(10); |
| } |
| AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); |
| |
| dualComparison(prism, cluster2, bundles[1].getProcessData()); |
| |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| waitingForBundleFinish(cluster3, oldBundleId); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| int finalNumberOfInstances = InstanceUtil |
| .getProcessInstanceList(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS) |
| .size(); |
| Assert.assertEquals(finalNumberOfInstances, |
| getExpectedNumberOfWorkflowInstances(bundles[1] |
| .getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart(), |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getEnd())); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| } |
| |
| private void setBundleWFPath(Bundle... bundles) { |
| for (Bundle bundle : bundles) { |
| bundle.setProcessWorkflow(workflowPath); |
| } |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessFrequencyInEachColoWithOneProcessRunningDaily() throws Exception { |
| //set daily process |
| final String startTime = TimeUtil.getTimeWrtSystemTime(-20); |
| String endTime = TimeUtil.getTimeWrtSystemTime(4000); |
| bundles[1].setProcessPeriodicity(1, TimeUnit.days); |
| bundles[1].setOutputFeedPeriodicity(1, TimeUnit.days); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| List<String> oldNominalTimes = |
| OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); |
| |
| LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData())); |
| |
| String updatedProcess = InstanceUtil |
| .setProcessFrequency(bundles[1].getProcessData(), |
| new Frequency("" + 5, TimeUnit.minutes)); |
| |
| LOGGER.info("updated process: " + updatedProcess); |
| |
| //now to update |
| ServiceResponse response = |
| prism.getProcessHelper().update(updatedProcess, updatedProcess); |
| AssertUtil.assertSucceeded(response); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); |
| |
| String prismString = dualComparison(prism, cluster2, bundles[1].getProcessData()); |
| Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), |
| new Frequency("" + 5, TimeUnit.minutes)); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated |
| // correctly. |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, true); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void |
| updateProcessFrequencyInEachColoWithOneProcessRunningDailyToMonthlyWithStartChange() |
| throws Exception { |
| //set daily process |
| final String startTime = TimeUtil.getTimeWrtSystemTime(-20); |
| String endTime = TimeUtil.getTimeWrtSystemTime(4000 * 60); |
| bundles[1].setProcessPeriodicity(1, TimeUnit.days); |
| bundles[1].setOutputFeedPeriodicity(1, TimeUnit.days); |
| bundles[1].setProcessValidity(startTime, endTime); |
| |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData())); |
| |
| String updatedProcess = InstanceUtil |
| .setProcessFrequency(bundles[1].getProcessData(), |
| new Frequency("" + 1, TimeUnit.months)); |
| updatedProcess = InstanceUtil |
| .setProcessValidity(updatedProcess, TimeUtil.getTimeWrtSystemTime(10), |
| endTime); |
| |
| LOGGER.info("updated process: " + updatedProcess); |
| |
| //now to update |
| ServiceResponse response = |
| prism.getProcessHelper().update(updatedProcess, updatedProcess); |
| AssertUtil.assertSucceeded(response); |
| String prismString = dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), |
| new Frequency("" + 1, TimeUnit.months)); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, true); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessRollStartTimeBackwardsToPastInEachColoWithOneProcessRunning() |
| throws Exception { |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| TimeUtil.sleepSeconds(30); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, |
| EntityType.PROCESS); |
| |
| String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart() |
| ), -3); |
| bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getEnd() |
| )); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0); |
| AssertUtil.assertSucceeded( |
| prism.getProcessHelper() |
| .update(bundles[1].getProcessData(), bundles[1].getProcessData())); |
| |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, true); |
| bundles[1].verifyDependencyListing(cluster2); |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessRollStartTimeForwardInEachColoWithOneProcessSuspended() |
| throws Exception { |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| TimeUtil.sleepSeconds(30); |
| |
| OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId); |
| String oldStartTime = TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart() |
| ); |
| String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart() |
| ), 3); |
| bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getEnd() |
| )); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().suspend(bundles[1].getProcessData())); |
| |
| AssertUtil.assertSucceeded( |
| prism.getProcessHelper() |
| .update(bundles[1].getProcessData(), bundles[1].getProcessData())); |
| |
| dualComparison(prism, cluster2, bundles[1].getProcessData()); |
| |
| bundles[1].verifyDependencyListing(cluster2); |
| |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| //ensure that the running process has new coordinators created; while the submitted |
| // one is updated correctly. |
| int finalNumberOfInstances = |
| InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, |
| Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); |
| Assert.assertEquals(finalNumberOfInstances, |
| getExpectedNumberOfWorkflowInstances(oldStartTime, |
| bundles[1].getProcessObject().getClusters().getClusters().get(0) |
| .getValidity().getEnd())); |
| Assert.assertEquals(InstanceUtil |
| .getProcessInstanceList(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS) |
| .size(), getExpectedNumberOfWorkflowInstances(newStartTime, |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd())); |
| |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| } |
| |
| @Test(groups = { "multiCluster" }, timeOut = 1200000) |
| public void updateProcessRollStartTimeBackwardsInEachColoWithOneProcessSuspended() |
| throws Exception { |
| bundles[1].submitBundle(prism); |
| //now to schedule in 1 colo and let it remain in another |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); |
| String oldBundleId = InstanceUtil |
| .getLatestBundleID(cluster3, |
| Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); |
| TimeUtil.sleepSeconds(30); |
| |
| String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getStart() |
| ), -3); |
| bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate( |
| bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() |
| .getEnd() |
| )); |
| InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); |
| |
| waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); |
| |
| AssertUtil.assertSucceeded( |
| cluster3.getProcessHelper().suspend(bundles[1].getProcessData())); |
| AssertUtil.assertSucceeded( |
| prism.getProcessHelper() |
| .update(bundles[1].getProcessData(), bundles[1].getProcessData())); |
| AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); |
| List<String> oldNominalTimes = |
| OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); |
| |
| OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, |
| bundles[1].getProcessData(), true, false); |
| |
| bundles[1].verifyDependencyListing(cluster2); |
| |
| dualComparison(prism, cluster3, bundles[1].getProcessData()); |
| AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); |
| } |
| |
| @Test(timeOut = 1200000) |
| public void |
| updateProcessWorkflowXml() throws URISyntaxException, JAXBException, |
| IOException, OozieClientException, AuthenticationException { |
| Bundle b = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName()); |
| HadoopFileEditor hadoopFileEditor = null; |
| try { |
| |
| b = new Bundle(b, cluster1); |
| b.setProcessWorkflow(workflowPath); |
| b.generateUniqueBundle(); |
| |
| b.setProcessValidity(TimeUtil.getTimeWrtSystemTime(-10), |
| TimeUtil.getTimeWrtSystemTime(15)); |
| b.submitFeedsScheduleProcess(prism); |
| |
| InstanceUtil.waitTillInstancesAreCreated(cluster1, b.getProcessData(), 0, 10); |
| OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, |
| b.getProcessName(), 0); |
| InstanceUtil.waitTillInstanceReachState(serverOC.get(0), |
| Util.readEntityName(b.getProcessData()), 0, CoordinatorAction.Status.RUNNING, |
| EntityType.PROCESS); |
| |
| //save old data |
| String oldBundleID = InstanceUtil |
| .getLatestBundleID(cluster1, |
| Util.readEntityName(b.getProcessData()), EntityType.PROCESS); |
| |
| List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, |
| oldBundleID, |
| EntityType.PROCESS); |
| |
| //update workflow.xml |
| hadoopFileEditor = new HadoopFileEditor(cluster1FS); |
| hadoopFileEditor.edit(new ProcessMerlin(b |
| .getProcessData()).getWorkflow().getPath() + "/workflow.xml", "</workflow-app>", |
| "<!-- some comment -->"); |
| |
| //update |
| prism.getProcessHelper().update(b.getProcessData(), |
| b.getProcessData()); |
| |
| TimeUtil.sleepSeconds(20); |
| //verify new bundle creation |
| OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes, |
| b.getProcessData(), true, true); |
| |
| } finally { |
| b.deleteBundle(prism); |
| if (hadoopFileEditor != null) { |
| hadoopFileEditor.restore(); |
| } |
| } |
| |
| } |
| |
| public ServiceResponse updateProcessConcurrency(Bundle bundle, int concurrency) |
| throws Exception { |
| String oldData = bundle.getProcessData(); |
| ProcessMerlin updatedProcess = new ProcessMerlin(bundle.getProcessObject()); |
| updatedProcess.setParallel(concurrency); |
| |
| return prism.getProcessHelper() |
| .update(oldData, updatedProcess.toString()); |
| } |
| |
| /** |
| * this method compares process xml definition from 2 falcon servers / prism and expects them to |
| * be identical. If the definitions are identical then the definition from @param coloHelper1 |
| * is @return are response. |
| */ |
| private String dualComparison(ColoHelper coloHelper1, ColoHelper coloHelper2, |
| String processData) throws Exception { |
| String colo1Response = getResponse(coloHelper1, processData, true); |
| String colo2Response = getResponse(coloHelper2, processData, true); |
| Assert.assertTrue(XmlUtil.isIdentical(colo1Response, colo2Response), |
| "Process definition should have been identical"); |
| return getResponse(coloHelper1, processData, true); |
| } |
| |
| /** |
| * this method compares process xml definition from 2 falcon servers / prism and expects them to |
| * be different. |
| */ |
| private void dualComparisonFailure(ColoHelper coloHelper1, ColoHelper coloHelper2, |
| String processData) throws Exception { |
| Assert.assertFalse(XmlUtil.isIdentical(getResponse(coloHelper1, processData, true), |
| getResponse(coloHelper2, processData, true)), "Process definition should not have been " |
| + "identical"); |
| } |
| |
| private String getResponse(ColoHelper prism, String processData, boolean bool) |
| throws Exception { |
| ServiceResponse response = prism.getProcessHelper().getEntityDefinition(processData); |
| if (bool) { |
| AssertUtil.assertSucceeded(response); |
| } else { |
| AssertUtil.assertFailed(response); |
| } |
| String result = response.getMessage(); |
| Assert.assertNotNull(result); |
| |
| return result; |
| |
| } |
| |
| private void waitForProcessToReachACertainState(ColoHelper coloHelper, Bundle bundle, |
| Job.Status state) |
| throws Exception { |
| |
| while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(), |
| Util.readEntityName(bundle.getProcessData()), EntityType.PROCESS) != state) { |
| //keep waiting |
| TimeUtil.sleepSeconds(10); |
| } |
| |
| //now check if the coordinator is in desired state |
| CoordinatorJob coord = getDefaultOozieCoord(coloHelper, InstanceUtil |
| .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()), |
| EntityType.PROCESS)); |
| |
| while (coord.getStatus() != state) { |
| TimeUtil.sleepSeconds(10); |
| coord = getDefaultOozieCoord(coloHelper, InstanceUtil |
| .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()), |
| EntityType.PROCESS)); |
| } |
| } |
| |
| private Bundle usualGrind(Bundle b) throws Exception { |
| b.setInputFeedDataPath(inputFeedPath); |
| String prefix = b.getFeedDataPathPrefix(); |
| HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS); |
| final String starTime = TimeUtil.getTimeWrtSystemTime(3); |
| String endTime = TimeUtil.getTimeWrtSystemTime(7); |
| b.setProcessPeriodicity(1, TimeUnit.minutes); |
| b.setOutputFeedPeriodicity(1, TimeUnit.minutes); |
| b.setProcessValidity(starTime, endTime); |
| return b; |
| } |
| |
| private ExecutionType getRandomExecutionType(Bundle bundle) throws Exception { |
| ExecutionType current = bundle.getProcessObject().getOrder(); |
| Random r = new Random(); |
| ExecutionType[] values = ExecutionType.values(); |
| int i; |
| do { |
| |
| i = r.nextInt(values.length); |
| } while (current == values[i]); |
| return values[i]; |
| } |
| |
| public ServiceResponse updateProcessFrequency(Bundle bundle, |
| org.apache.falcon.entity.v0.Frequency frequency) |
| throws Exception { |
| String oldData = bundle.getProcessData(); |
| ProcessMerlin updatedProcess = new ProcessMerlin(bundle.getProcessObject()); |
| updatedProcess.setFrequency(frequency); |
| return prism.getProcessHelper() |
| .update(oldData, updatedProcess.toString()); |
| } |
| |
| //need to expand this function more later |
| private int getExpectedNumberOfWorkflowInstances(String start, String end) { |
| DateTime startDate = new DateTime(start); |
| DateTime endDate = new DateTime(end); |
| Minutes minutes = Minutes.minutesBetween((startDate), (endDate)); |
| return minutes.getMinutes(); |
| } |
| |
| private int getExpectedNumberOfWorkflowInstances(Date start, Date end) { |
| DateTime startDate = new DateTime(start); |
| DateTime endDate = new DateTime(end); |
| Minutes minutes = Minutes.minutesBetween((startDate), (endDate)); |
| return minutes.getMinutes(); |
| } |
| |
| private int getExpectedNumberOfWorkflowInstances(String start, Date end) { |
| DateTime startDate = new DateTime(start); |
| DateTime endDate = new DateTime(end); |
| Minutes minutes = Minutes.minutesBetween((startDate), (endDate)); |
| return minutes.getMinutes(); |
| } |
| |
| private void waitingForBundleFinish(ColoHelper coloHelper, String bundleId, int minutes) |
| throws Exception { |
| int wait = 0; |
| while (!OozieUtil.isBundleOver(coloHelper, bundleId)) { |
| //create missing dependencies if new instance have come up |
| OozieUtil.createMissingDependenciesForBundle(coloHelper, bundleId); |
| |
| //keep waiting |
| LOGGER.info("bundle " + bundleId + " not over .. waiting"); |
| TimeUtil.sleepSeconds(60); |
| wait++; |
| if (wait == minutes) { |
| Assert.assertTrue(false); |
| break; |
| } |
| } |
| } |
| |
| private void waitingForBundleFinish(ColoHelper coloHelper, String bundleId) throws Exception { |
| waitingForBundleFinish(coloHelper, bundleId, 15); |
| } |
| |
| private CoordinatorJob getDefaultOozieCoord(ColoHelper coloHelper, String bundleId) |
| throws Exception { |
| OozieClient client = coloHelper.getFeedHelper().getOozieClient(); |
| BundleJob bundlejob = client.getBundleJobInfo(bundleId); |
| |
| for (CoordinatorJob coord : bundlejob.getCoordinators()) { |
| if (coord.getAppName().contains("DEFAULT")) { |
| return client.getCoordJobInfo(coord.getId()); |
| } |
| } |
| return null; |
| } |
| |
| @AfterClass(alwaysRun = true) |
| public void tearDownClass() throws IOException { |
| cleanTestDirs(); |
| } |
| } |