blob: db29b1967959d57d649219fcbf085cf18845efbe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.prism;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.process.ExecutionType;
import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.APIResult;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.supportClasses.HadoopFileEditor;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.core.util.XmlUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Minutes;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.List;
import java.util.Random;
/**
* test for process update.
*/
@Test(groups = "distributed")
public class NewPrismProcessUpdateTest extends BaseTestClass {
private String baseTestDir = baseHDFSDir + "/NewPrismProcessUpdateTest";
private String inputFeedPath = baseTestDir + MINUTE_DATE_PATTERN;
private String workflowPath = baseTestDir + "/falcon-oozie-wf";
private String workflowPath2 = baseTestDir + "/falcon-oozie-wf2";
private String aggregatorPath = baseTestDir + "/aggregator";
private String aggregator1Path = baseTestDir + "/aggregator1";
private ColoHelper cluster1 = servers.get(0);
private ColoHelper cluster2 = servers.get(1);
private ColoHelper cluster3 = servers.get(2);
private FileSystem cluster1FS = serverFS.get(0);
private OozieClient cluster2OC = serverOC.get(1);
private OozieClient cluster3OC = serverOC.get(2);
private static final Logger LOGGER = Logger.getLogger(NewPrismProcessUpdateTest.class);
@BeforeMethod(alwaysRun = true)
public void testSetup(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
Bundle b = BundleUtil.readUpdateBundle();
bundles[0] = new Bundle(b, cluster1);
bundles[0].generateUniqueBundle();
bundles[1] = new Bundle(b, cluster2);
bundles[1].generateUniqueBundle();
bundles[2] = new Bundle(b, cluster3);
bundles[2].generateUniqueBundle();
setBundleWFPath(bundles[0], bundles[1], bundles[2]);
bundles[1].addClusterToBundle(bundles[2].getClusters().get(0),
ClusterType.TARGET, null, null);
usualGrind(bundles[1]);
Util.restartService(cluster3.getClusterHelper());
}
@BeforeClass(alwaysRun = true)
public void setup() throws Exception {
for (String wfPath : new String[] { workflowPath, workflowPath2, aggregatorPath, aggregator1Path }) {
uploadDirToClusters(wfPath, OSUtil.RESOURCES_OOZIE);
}
Util.restartService(cluster3.getClusterHelper());
Util.restartService(cluster1.getClusterHelper());
Util.restartService(cluster2.getClusterHelper());
}
@AfterMethod(alwaysRun = true)
public void tearDown() {
removeBundles();
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessFrequencyInEachColoWithOneProcessRunningMonthly()
throws Exception {
final String startTIme = TimeUtil.getTimeWrtSystemTime(-20);
String endTime = TimeUtil.getTimeWrtSystemTime(4000 * 60);
bundles[1].setProcessPeriodicity(1, TimeUnit.months);
bundles[1].setOutputFeedPeriodicity(1, TimeUnit.months);
bundles[1].setProcessValidity(startTIme, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
String updatedProcess = InstanceUtil
.setProcessFrequency(bundles[1].getProcessData(),
new Frequency("" + 5, TimeUnit.minutes));
LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess));
//now to update
while (Util
.parseResponse(prism.getProcessHelper()
.update((bundles[1].getProcessData()), updatedProcess))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("update didn't SUCCEED in last attempt");
TimeUtil.sleepSeconds(10);
}
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
Util.getProcessObject(updatedProcess).getFrequency());
TimeUtil.sleepSeconds(60);
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
waitingForBundleFinish(cluster3, oldBundleId, 5);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
//failing due to falcon bug : https://issues.apache.org/jira/browse/FALCON-458
public void updateProcessRollStartTimeForwardInEachColoWithOneProcessRunning()
throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(3);
String endTime = TimeUtil.getTimeWrtSystemTime(7);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
List<String> oldNominalTimes =
OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()
), 20);
String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()
), 25);
bundles[1].setProcessValidity(newStartTime, newEndTime);
bundles[1].setProcessConcurrency(10);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
LOGGER.info("updated process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
while (Util.parseResponse(
prism.getProcessHelper()
.update(bundles[1].getProcessData(), Util.prettyPrintXml(bundles[1]
.getProcessData())))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("update didn't SUCCEED in last attempt");
TimeUtil.sleepSeconds(10);
}
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
dualComparison(prism, cluster3, bundles[1].getProcessData());
waitingForBundleFinish(cluster3, oldBundleId, 15);
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
int finalNumberOfInstances =
InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
Assert.assertEquals(finalNumberOfInstances,
getExpectedNumberOfWorkflowInstances(TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters()
.get(0).getValidity()
.getEnd())));
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
int expectedNumberOfWorkflows =
getExpectedNumberOfWorkflowInstances(newStartTime, TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getEnd()));
Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
expectedNumberOfWorkflows);
}
@Test(groups = { "multiCluster" }, timeOut = 1800000)
public void updateProcessConcurrencyWorkflowExecutionInEachColoWithOneColoDown()
throws Exception {
//bundles[1].generateUniqueBundle();
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
TimeUtil.sleepSeconds(25);
int initialConcurrency = bundles[1].getProcessObject().getParallel();
bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3);
bundles[1].setProcessWorkflow(workflowPath2);
bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1]));
//stop cluster 3 where process is scheduled
Util.shutDownService(cluster3.getProcessHelper());
//now to update
AssertUtil.assertPartial(
prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()));
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
initialConcurrency);
Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
workflowPath);
Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
bundles[1].getProcessObject().getOrder());
String coloString = getResponse(cluster2, bundles[1].getProcessData(), true);
Assert.assertEquals(Util.getProcessObject(coloString).getWorkflow().getPath(),
workflowPath2);
Util.startService(cluster3.getProcessHelper());
dualComparisonFailure(prism, cluster2, bundles[1].getProcessData());
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
AssertUtil
.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
while (Util.parseResponse(
prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("WARNING: update did not succeed, retrying ");
TimeUtil.sleepSeconds(20);
}
prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
initialConcurrency + 3);
Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
workflowPath2);
Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
bundles[1].getProcessObject().getOrder());
dualComparison(prism, cluster3, bundles[1].getProcessData());
AssertUtil
.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
waitingForBundleFinish(cluster3, oldBundleId);
int finalNumberOfInstances =
InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
int expectedInstances =
getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters()
.get(0).getValidity()
.getEnd()));
Assert.assertEquals(finalNumberOfInstances, expectedInstances,
"number of instances doesnt match :(");
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessFrequencyInEachColoWithOneProcessRunning() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-2);
String endTime = TimeUtil.getTimeWrtSystemTime(20);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
String updatedProcess = InstanceUtil
.setProcessFrequency(bundles[1].getProcessData(),
new Frequency("" + 7, TimeUnit.minutes));
LOGGER.info("updated process: " + updatedProcess);
//now to update
ServiceResponse response =
prism.getProcessHelper().update(updatedProcess, updatedProcess);
AssertUtil.assertSucceeded(response);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
Util.getProcessObject(updatedProcess).getFrequency());
dualComparison(prism, cluster3, bundles[1].getProcessData());
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessNameInEachColoWithOneProcessRunning() throws Exception {
//bundles[1].generateUniqueBundle();
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String originalProcessData = bundles[1].getProcessData();
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
TimeUtil.sleepSeconds(20);
List<String> oldNominalTimes =
OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
bundles[1].setProcessName("myNewProcessName");
//now to update
ServiceResponse response =
prism.getProcessHelper()
.update((bundles[1].getProcessData()), bundles[1].getProcessData());
AssertUtil.assertFailed(response);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
originalProcessData, false, false);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessConcurrencyInEachColoWithOneProcessRunning()
throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-2);
String endTime = TimeUtil.getTimeWrtSystemTime(10);
bundles[1].setProcessValidity(startTime, endTime);
//bundles[1].generateUniqueBundle();
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
//now to update
DateTime updateTime = new DateTime(DateTimeZone.UTC);
TimeUtil.sleepSeconds(60);
List<String> oldNominalTimes =
OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
LOGGER.info("updating at " + updateTime);
while (Util
.parseResponse(updateProcessConcurrency(bundles[1],
bundles[1].getProcessObject().getParallel() + 3))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("WARNING: update did not scceed, retyring ");
TimeUtil.sleepSeconds(20);
}
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
bundles[1].getProcessObject().getParallel() + 3);
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated
// correctly.
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(),
false, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
// future : should be verified using cord xml
Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
boolean doesExist = false;
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
&&
status != Job.Status.DONEWITHERROR) {
int statusCount = InstanceUtil
.getInstanceCountWithStatus(cluster3,
Util.readEntityName(bundles[1].getProcessData()),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS);
if (statusCount == bundles[1].getProcessObject().getParallel() + 3) {
doesExist = true;
break;
}
status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
Assert.assertNotNull(status,
"status must not be null!");
TimeUtil.sleepSeconds(30);
}
Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
int expectedNumberOfInstances =
getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters()
.get(0).getValidity()
.getEnd()));
Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
expectedNumberOfInstances);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessIncreaseValidityInEachColoWithOneProcessRunning() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(3);
String endTime = TimeUtil.getTimeWrtSystemTime(8);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getEnd()
), 4);
bundles[1].setProcessValidity(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()),
newEndTime);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
ServiceResponse response = prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData());
for (int i = 0; i < 10
&&
Util.parseResponse(response).getStatus() != APIResult.Status.SUCCEEDED; ++i) {
response = prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData());
TimeUtil.sleepSeconds(6);
}
Assert.assertEquals(Util.parseResponse(response).getStatus(),
APIResult.Status.SUCCEEDED, "Process update did not succeed.");
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), false, true);
int i = 0;
while (OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId)
!= getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()
), TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity()
.getEnd()))
&& i < 10) {
TimeUtil.sleepSeconds(1);
i++;
}
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
waitingForBundleFinish(cluster3, oldBundleId);
//ensure that the running process has new coordinators created; while the submitted
// one is updated
// correctly.
int finalNumberOfInstances = InstanceUtil
.getProcessInstanceList(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS)
.size();
Assert.assertEquals(finalNumberOfInstances,
getExpectedNumberOfWorkflowInstances(TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters()
.get(0).getValidity()
.getEnd())));
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessConcurrencyInEachColoWithOneProcessSuspended()
throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(3);
String endTime = TimeUtil.getTimeWrtSystemTime(7);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
//now to update
while (Util
.parseResponse(updateProcessConcurrency(bundles[1],
bundles[1].getProcessObject().getParallel() + 3))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("WARNING: update did not scceed, retyring ");
TimeUtil.sleepSeconds(20);
}
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
bundles[1].getProcessObject().getParallel() + 3);
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), false, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
AssertUtil.checkStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
boolean doesExist = false;
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
&&
status != Job.Status.DONEWITHERROR) {
if (InstanceUtil
.getInstanceCountWithStatus(cluster3,
Util.readEntityName(bundles[1].getProcessData()),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS)
==
bundles[1].getProcessObject().getParallel()) {
doesExist = true;
break;
}
status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
}
Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
waitingForBundleFinish(cluster3, oldBundleId);
int finalNumberOfInstances =
InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
int expectedInstances =
getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters()
.get(0).getValidity()
.getEnd()));
Assert.assertEquals(finalNumberOfInstances, expectedInstances,
"number of instances doesnt match :(");
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessConcurrencyInEachColoWithOneColoDown() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-1);
String endTime = TimeUtil.getTimeWrtSystemTime(5);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
LOGGER.info("process to be scheduled: " + Util.prettyPrintXml(bundles[1].getProcessData()));
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
//now to update
Util.shutDownService(cluster3.getClusterHelper());
ServiceResponse response =
updateProcessConcurrency(bundles[1],
bundles[1].getProcessObject().getParallel() + 3);
AssertUtil.assertPartial(response);
Util.startService(cluster3.getClusterHelper());
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
dualComparison(prism, cluster3, bundles[1].getProcessData());
Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
bundles[1].getProcessObject().getParallel());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
Job.Status.RUNNING);
while (Util
.parseResponse(updateProcessConcurrency(bundles[1],
bundles[1].getProcessObject().getParallel() + 3))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("WARNING: update did not scceed, retyring ");
TimeUtil.sleepSeconds(20);
}
dualComparison(prism, cluster3, bundles[1].getProcessData());
dualComparison(prism, cluster2, bundles[1].getProcessData());
Job.Status status =
OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
boolean doesExist = false;
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
&&
status != Job.Status.DONEWITHERROR) {
if (InstanceUtil
.getInstanceCountWithStatus(cluster3,
Util.readEntityName(bundles[1].getProcessData()),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
EntityType.PROCESS)
==
bundles[1].getProcessObject().getParallel() + 3) {
doesExist = true;
break;
}
status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(),
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
}
Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS),
oldNominalTimes, bundles[1].getProcessData(), false,
true
);
waitingForBundleFinish(cluster3, oldBundleId);
int finalNumberOfInstances =
InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
int expectedInstances =
getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters()
.get(0).getValidity()
.getEnd()));
Assert.assertEquals(finalNumberOfInstances, expectedInstances,
"number of instances doesnt match :(");
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessConcurrencyExecutionWorkflowInEachColoWithOneProcessRunning()
throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-1);
String endTime = TimeUtil.getTimeWrtSystemTime(7);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
int initialConcurrency = bundles[1].getProcessObject().getParallel();
bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3);
bundles[1].setProcessWorkflow(aggregator1Path);
bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1]));
//now to update
String updateTime = new DateTime(DateTimeZone.UTC).plusMinutes(2).toString();
LOGGER.info("updating @ " + updateTime);
while (Util.parseResponse(
prism.getProcessHelper().update((bundles[1].getProcessData()), bundles[1]
.getProcessData())).getStatus() != APIResult.Status.SUCCEEDED) {
TimeUtil.sleepSeconds(10);
}
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
initialConcurrency + 3);
Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
aggregator1Path);
Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
bundles[1].getProcessObject().getOrder());
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
waitingForBundleFinish(cluster3, oldBundleId);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
int finalNumberOfInstances =
InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
int expectedInstances =
getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters()
.get(0).getValidity()
.getEnd()));
Assert.assertEquals(finalNumberOfInstances, expectedInstances,
"number of instances doesnt match :(");
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessConcurrencyExecutionWorkflowInEachColoWithOneProcessSuspended()
throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(2);
String endTime = TimeUtil.getTimeWrtSystemTime(6);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
int initialConcurrency = bundles[1].getProcessObject().getParallel();
bundles[1].setProcessConcurrency(bundles[1].getProcessObject().getParallel() + 3);
bundles[1].setProcessWorkflow(aggregator1Path);
bundles[1].getProcessObject().setOrder(getRandomExecutionType(bundles[1]));
//suspend
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
//now to update
String updateTime = new DateTime(DateTimeZone.UTC).plusMinutes(2).toString();
LOGGER.info("updating @ " + updateTime);
while (Util.parseResponse(
prism.getProcessHelper()
.update((bundles[1].getProcessData()), bundles[1].getProcessData()))
.getStatus() != APIResult.Status.SUCCEEDED) {
TimeUtil.sleepSeconds(10);
}
AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
String prismString = getResponse(prism, bundles[1].getProcessData(), true);
Assert.assertEquals(Util.getProcessObject(prismString).getParallel(),
initialConcurrency + 3);
Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(),
aggregator1Path);
Assert.assertEquals(Util.getProcessObject(prismString).getOrder(),
bundles[1].getProcessObject().getOrder());
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
waitingForBundleFinish(cluster3, oldBundleId);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
int finalNumberOfInstances =
InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
int expectedInstances =
getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
TimeUtil
.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters()
.get(0).getValidity()
.getEnd()));
Assert.assertEquals(finalNumberOfInstances, expectedInstances,
"number of instances doesnt match :(");
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessAddNewInputInEachColoWithOneProcessRunning() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-1);
String endTime = TimeUtil.getTimeWrtSystemTime(6);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
TimeUtil.sleepSeconds(20);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
String inputFeed = bundles[1].getInputFeedFromBundle();
bundles[1].addProcessInput(newFeedName, "inputData2");
inputFeed = Util.setFeedName(inputFeed, newFeedName);
LOGGER.info(inputFeed);
AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed));
while (Util.parseResponse(
prism.getProcessHelper()
.update((bundles[1].getProcessData()), bundles[1].getProcessData()))
.getStatus() != APIResult.Status.SUCCEEDED) {
TimeUtil.sleepSeconds(20);
}
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
waitingForBundleFinish(cluster3, oldBundleId);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessAddNewInputInEachColoWithOneProcessSuspended() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(1);
String endTime = TimeUtil.getTimeWrtSystemTime(6);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
String inputFeed = bundles[1].getInputFeedFromBundle();
bundles[1].addProcessInput(newFeedName, "inputData2");
inputFeed = Util.setFeedName(inputFeed, newFeedName);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed));
while (Util.parseResponse(
prism.getProcessHelper()
.update((bundles[1].getProcessData()), bundles[1].getProcessData()))
.getStatus() != APIResult.Status.SUCCEEDED) {
TimeUtil.sleepSeconds(10);
}
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
waitingForBundleFinish(cluster3, oldBundleId);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessAddNewInputInEachColoWithOneColoDown() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(3);
String endTime = TimeUtil.getTimeWrtSystemTime(10);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
String originalProcess = bundles[1].getProcessData();
String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2";
String inputFeed = bundles[1].getInputFeedFromBundle();
bundles[1].addProcessInput(newFeedName, "inputData2");
inputFeed = Util.setFeedName(inputFeed, newFeedName);
String updatedProcess = bundles[1].getProcessData();
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(originalProcess));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(originalProcess), EntityType.PROCESS);
InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
//submit new feed
AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed));
Util.shutDownService(cluster3.getProcessHelper());
AssertUtil.assertPartial(
prism.getProcessHelper()
.update(updatedProcess, updatedProcess));
Util.startService(cluster3.getProcessHelper());
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
Assert.assertFalse(Util.isDefinitionSame(cluster2, prism, originalProcess));
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), false, false);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
Job.Status.RUNNING);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
while (Util.parseResponse(
prism.getProcessHelper().update(updatedProcess, updatedProcess)).getStatus()
!= APIResult.Status.SUCCEEDED) {
LOGGER.info("update didnt SUCCEED in last attempt");
TimeUtil.sleepSeconds(10);
}
dualComparison(prism, cluster3, bundles[1].getProcessData());
Assert.assertTrue(Util.isDefinitionSame(cluster2, prism, originalProcess));
bundles[1].verifyDependencyListing(cluster2);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
updatedProcess, true, false);
waitingForBundleFinish(cluster3, oldBundleId);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1],
Job.Status.RUNNING);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessDecreaseValidityInEachColoWithOneProcessRunning() throws Exception {
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getEnd()
), -2);
bundles[1].setProcessValidity(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()),
newEndTime);
while (Util.parseResponse(
(prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData())))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("update didnt SUCCEED in last attempt");
TimeUtil.sleepSeconds(10);
}
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), false, true);
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
waitingForBundleFinish(cluster3, oldBundleId);
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
int finalNumberOfInstances = InstanceUtil
.getProcessInstanceList(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS)
.size();
Assert.assertEquals(finalNumberOfInstances,
getExpectedNumberOfWorkflowInstances(bundles[1]
.getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart(),
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getEnd()));
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
int expectedNumberOfWorkflows =
getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getStart()),
newEndTime);
Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId),
expectedNumberOfWorkflows);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessIncreaseValidityInEachColoWithOneProcessSuspended() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(-1);
String endTime = TimeUtil.getTimeWrtSystemTime(3);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getEnd()
), 4);
bundles[1].setProcessValidity(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()),
newEndTime);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
while (Util.parseResponse(
prism.getProcessHelper()
.update((bundles[1].getProcessData()), bundles[1].getProcessData()))
.getStatus() != APIResult.Status.SUCCEEDED) {
LOGGER.info("update didnt SUCCEED in last attempt");
TimeUtil.sleepSeconds(10);
}
AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
dualComparison(prism, cluster2, bundles[1].getProcessData());
dualComparison(prism, cluster3, bundles[1].getProcessData());
waitingForBundleFinish(cluster3, oldBundleId);
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
int finalNumberOfInstances = InstanceUtil
.getProcessInstanceList(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS)
.size();
Assert.assertEquals(finalNumberOfInstances,
getExpectedNumberOfWorkflowInstances(bundles[1]
.getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart(),
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getEnd()));
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
private void setBundleWFPath(Bundle... bundles) {
for (Bundle bundle : bundles) {
bundle.setProcessWorkflow(workflowPath);
}
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessFrequencyInEachColoWithOneProcessRunningDaily() throws Exception {
//set daily process
final String startTime = TimeUtil.getTimeWrtSystemTime(-20);
String endTime = TimeUtil.getTimeWrtSystemTime(4000);
bundles[1].setProcessPeriodicity(1, TimeUnit.days);
bundles[1].setOutputFeedPeriodicity(1, TimeUnit.days);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes =
OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
String updatedProcess = InstanceUtil
.setProcessFrequency(bundles[1].getProcessData(),
new Frequency("" + 5, TimeUnit.minutes));
LOGGER.info("updated process: " + updatedProcess);
//now to update
ServiceResponse response =
prism.getProcessHelper().update(updatedProcess, updatedProcess);
AssertUtil.assertSucceeded(response);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10);
String prismString = dualComparison(prism, cluster2, bundles[1].getProcessData());
Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
new Frequency("" + 5, TimeUnit.minutes));
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated
// correctly.
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void
updateProcessFrequencyInEachColoWithOneProcessRunningDailyToMonthlyWithStartChange()
throws Exception {
//set daily process
final String startTime = TimeUtil.getTimeWrtSystemTime(-20);
String endTime = TimeUtil.getTimeWrtSystemTime(4000 * 60);
bundles[1].setProcessPeriodicity(1, TimeUnit.days);
bundles[1].setOutputFeedPeriodicity(1, TimeUnit.days);
bundles[1].setProcessValidity(startTime, endTime);
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData()));
String updatedProcess = InstanceUtil
.setProcessFrequency(bundles[1].getProcessData(),
new Frequency("" + 1, TimeUnit.months));
updatedProcess = InstanceUtil
.setProcessValidity(updatedProcess, TimeUtil.getTimeWrtSystemTime(10),
endTime);
LOGGER.info("updated process: " + updatedProcess);
//now to update
ServiceResponse response =
prism.getProcessHelper().update(updatedProcess, updatedProcess);
AssertUtil.assertSucceeded(response);
String prismString = dualComparison(prism, cluster3, bundles[1].getProcessData());
Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
new Frequency("" + 1, TimeUnit.months));
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessRollStartTimeBackwardsToPastInEachColoWithOneProcessRunning()
throws Exception {
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
TimeUtil.sleepSeconds(30);
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId,
EntityType.PROCESS);
String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()
), -3);
bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getEnd()
));
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
AssertUtil.assertSucceeded(
prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()));
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, true);
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessRollStartTimeForwardInEachColoWithOneProcessSuspended()
throws Exception {
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId);
String oldStartTime = TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()
);
String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()
), 3);
bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getEnd()
));
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
AssertUtil.assertSucceeded(
prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()));
dualComparison(prism, cluster2, bundles[1].getProcessData());
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
int finalNumberOfInstances =
InstanceUtil.getProcessInstanceListFromAllBundles(cluster3,
Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size();
Assert.assertEquals(finalNumberOfInstances,
getExpectedNumberOfWorkflowInstances(oldStartTime,
bundles[1].getProcessObject().getClusters().getClusters().get(0)
.getValidity().getEnd()));
Assert.assertEquals(InstanceUtil
.getProcessInstanceList(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS)
.size(), getExpectedNumberOfWorkflowInstances(newStartTime,
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd()));
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessRollStartTimeBackwardsInEachColoWithOneProcessSuspended()
throws Exception {
bundles[1].submitBundle(prism);
//now to schedule in 1 colo and let it remain in another
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().schedule(bundles[1].getProcessData()));
String oldBundleId = InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getStart()
), -3);
bundles[1].setProcessValidity(newStartTime, TimeUtil.dateToOozieDate(
bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity()
.getEnd()
));
InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10);
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
AssertUtil.assertSucceeded(
cluster3.getProcessHelper().suspend(bundles[1].getProcessData()));
AssertUtil.assertSucceeded(
prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()));
AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData()));
List<String> oldNominalTimes =
OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS);
OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes,
bundles[1].getProcessData(), true, false);
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@Test(timeOut = 1200000)
public void
updateProcessWorkflowXml() throws URISyntaxException, JAXBException,
IOException, OozieClientException, AuthenticationException {
Bundle b = BundleUtil.readELBundle();
HadoopFileEditor hadoopFileEditor = null;
try {
b = new Bundle(b, cluster1);
b.setProcessWorkflow(workflowPath);
b.generateUniqueBundle();
b.setProcessValidity(TimeUtil.getTimeWrtSystemTime(-10),
TimeUtil.getTimeWrtSystemTime(15));
b.submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstancesAreCreated(cluster1, b.getProcessData(), 0, 10);
OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
b.getProcessName(), 0);
InstanceUtil.waitTillInstanceReachState(serverOC.get(0),
Util.readEntityName(b.getProcessData()), 0, CoordinatorAction.Status.RUNNING,
EntityType.PROCESS);
//save old data
String oldBundleID = InstanceUtil
.getLatestBundleID(cluster1,
Util.readEntityName(b.getProcessData()), EntityType.PROCESS);
List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1,
oldBundleID,
EntityType.PROCESS);
//update workflow.xml
hadoopFileEditor = new HadoopFileEditor(cluster1FS);
hadoopFileEditor.edit(new ProcessMerlin(b
.getProcessData()).getWorkflow().getPath() + "/workflow.xml", "</workflow-app>",
"<!-- some comment -->");
//update
prism.getProcessHelper().update(b.getProcessData(),
b.getProcessData());
TimeUtil.sleepSeconds(20);
//verify new bundle creation
OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
b.getProcessData(), true, true);
} finally {
b.deleteBundle(prism);
if (hadoopFileEditor != null) {
hadoopFileEditor.restore();
}
}
}
public ServiceResponse updateProcessConcurrency(Bundle bundle, int concurrency)
throws Exception {
String oldData = bundle.getProcessData();
ProcessMerlin updatedProcess = new ProcessMerlin(bundle.getProcessObject());
updatedProcess.setParallel(concurrency);
return prism.getProcessHelper()
.update(oldData, updatedProcess.toString());
}
/**
* this method compares process xml definition from 2 falcon servers / prism and expects them to
* be identical. If the definitions are identical then the definition from @param coloHelper1
* is @return are response.
*/
private String dualComparison(ColoHelper coloHelper1, ColoHelper coloHelper2,
String processData) throws Exception {
String colo1Response = getResponse(coloHelper1, processData, true);
String colo2Response = getResponse(coloHelper2, processData, true);
Assert.assertTrue(XmlUtil.isIdentical(colo1Response, colo2Response),
"Process definition should have been identical");
return getResponse(coloHelper1, processData, true);
}
/**
* this method compares process xml definition from 2 falcon servers / prism and expects them to
* be different.
*/
private void dualComparisonFailure(ColoHelper coloHelper1, ColoHelper coloHelper2,
String processData) throws Exception {
Assert.assertFalse(XmlUtil.isIdentical(getResponse(coloHelper1, processData, true),
getResponse(coloHelper2, processData, true)), "Process definition should not have been "
+ "identical");
}
private String getResponse(ColoHelper prism, String processData, boolean bool)
throws Exception {
ServiceResponse response = prism.getProcessHelper().getEntityDefinition(processData);
if (bool) {
AssertUtil.assertSucceeded(response);
} else {
AssertUtil.assertFailed(response);
}
String result = response.getMessage();
Assert.assertNotNull(result);
return result;
}
private void waitForProcessToReachACertainState(ColoHelper coloHelper, Bundle bundle,
Job.Status state)
throws Exception {
while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(),
Util.readEntityName(bundle.getProcessData()), EntityType.PROCESS) != state) {
//keep waiting
TimeUtil.sleepSeconds(10);
}
//now check if the coordinator is in desired state
CoordinatorJob coord = getDefaultOozieCoord(coloHelper, InstanceUtil
.getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
EntityType.PROCESS));
while (coord.getStatus() != state) {
TimeUtil.sleepSeconds(10);
coord = getDefaultOozieCoord(coloHelper, InstanceUtil
.getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()),
EntityType.PROCESS));
}
}
private Bundle usualGrind(Bundle b) throws Exception {
b.setInputFeedDataPath(inputFeedPath);
String prefix = b.getFeedDataPathPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
final String starTime = TimeUtil.getTimeWrtSystemTime(3);
String endTime = TimeUtil.getTimeWrtSystemTime(7);
b.setProcessPeriodicity(1, TimeUnit.minutes);
b.setOutputFeedPeriodicity(1, TimeUnit.minutes);
b.setProcessValidity(starTime, endTime);
return b;
}
private ExecutionType getRandomExecutionType(Bundle bundle) throws Exception {
ExecutionType current = bundle.getProcessObject().getOrder();
Random r = new Random();
ExecutionType[] values = ExecutionType.values();
int i;
do {
i = r.nextInt(values.length);
} while (current == values[i]);
return values[i];
}
public ServiceResponse updateProcessFrequency(Bundle bundle,
org.apache.falcon.entity.v0.Frequency frequency)
throws Exception {
String oldData = bundle.getProcessData();
ProcessMerlin updatedProcess = new ProcessMerlin(bundle.getProcessObject());
updatedProcess.setFrequency(frequency);
return prism.getProcessHelper()
.update(oldData, updatedProcess.toString());
}
//need to expand this function more later
private int getExpectedNumberOfWorkflowInstances(String start, String end) {
DateTime startDate = new DateTime(start);
DateTime endDate = new DateTime(end);
Minutes minutes = Minutes.minutesBetween((startDate), (endDate));
return minutes.getMinutes();
}
private int getExpectedNumberOfWorkflowInstances(Date start, Date end) {
DateTime startDate = new DateTime(start);
DateTime endDate = new DateTime(end);
Minutes minutes = Minutes.minutesBetween((startDate), (endDate));
return minutes.getMinutes();
}
private int getExpectedNumberOfWorkflowInstances(String start, Date end) {
DateTime startDate = new DateTime(start);
DateTime endDate = new DateTime(end);
Minutes minutes = Minutes.minutesBetween((startDate), (endDate));
return minutes.getMinutes();
}
private void waitingForBundleFinish(ColoHelper coloHelper, String bundleId, int minutes)
throws Exception {
int wait = 0;
while (!OozieUtil.isBundleOver(coloHelper, bundleId)) {
//create missing dependencies if new instance have come up
OozieUtil.createMissingDependenciesForBundle(coloHelper, bundleId);
//keep waiting
LOGGER.info("bundle " + bundleId + " not over .. waiting");
TimeUtil.sleepSeconds(60);
wait++;
if (wait == minutes) {
Assert.assertTrue(false);
break;
}
}
}
private void waitingForBundleFinish(ColoHelper coloHelper, String bundleId) throws Exception {
waitingForBundleFinish(coloHelper, bundleId, 15);
}
private CoordinatorJob getDefaultOozieCoord(ColoHelper coloHelper, String bundleId)
throws Exception {
OozieClient client = coloHelper.getFeedHelper().getOozieClient();
BundleJob bundlejob = client.getBundleJobInfo(bundleId);
for (CoordinatorJob coord : bundlejob.getCoordinators()) {
if (coord.getAppName().contains("DEFAULT")) {
return client.getCoordJobInfo(coord.getId());
}
}
return null;
}
@AfterClass(alwaysRun = true)
public void tearDownClass() throws IOException {
cleanTestDirs();
}
}