/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.falcon.regression.prism;

import com.jcraft.jsch.JSchException;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.ActionType;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.AssertUtil;
import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.core.util.XmlUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClientException;
import org.custommonkey.xmlunit.Diff;
import org.custommonkey.xmlunit.XMLUnit;
import org.joda.time.DateTime;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.xml.sax.SAXException;
import org.apache.log4j.Logger;

import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.util.List;


public class UpdateAtSpecificTimeTest extends BaseTestClass {

    private static final Logger logger = Logger.getLogger(UpdateAtSpecificTimeTest.class);

    Bundle processBundle;
    ColoHelper cluster1 = servers.get(0);
    ColoHelper cluster2 = servers.get(1);
    ColoHelper cluster3 = servers.get(2);
    FileSystem cluster2FS = serverFS.get(1);
    private final String baseTestDir = baseHDFSDir + "/UpdateAtSpecificTimeTest-data";
    String aggregateWorkflowDir = baseHDFSDir + "/aggregator";

    @BeforeClass(alwaysRun = true)
    public void uploadWorkflow() throws Exception {
        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
    }

    @BeforeMethod(alwaysRun = true)
    public void setup(Method method) throws IOException {
        logger.info("test name: " + method.getName());
        Bundle bundle = BundleUtil.readLocalDCBundle();
        bundles[0] = new Bundle(bundle, cluster1);
        bundles[1] = new Bundle(bundle, cluster2);
        bundles[2] = new Bundle(bundle, cluster3);

        bundles[0].generateUniqueBundle();
        bundles[1].generateUniqueBundle();
        bundles[2].generateUniqueBundle();

        processBundle = BundleUtil.readELBundle();
        processBundle = new Bundle(processBundle, cluster1);
        processBundle.generateUniqueBundle();
        processBundle.setProcessWorkflow(aggregateWorkflowDir);
    }

    @AfterMethod(alwaysRun = true)
    public void tearDown() {
        removeBundles();
        removeBundles(processBundle);
    }

    @Test(groups = {"singleCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
    public void invalidChar_Process()
        throws JAXBException, IOException, URISyntaxException,
        AuthenticationException, OozieClientException {
        processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(0),
            TimeUtil.getTimeWrtSystemTime(20));
        processBundle.submitFeedsScheduleProcess(prism);
        InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
        String oldProcess =
            processBundle.getProcessData();
        processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(5),
            TimeUtil.getTimeWrtSystemTime(100));
        ServiceResponse r = prism.getProcessHelper().update(oldProcess,
            processBundle.getProcessData(), "abc", null);
        Assert.assertTrue(r.getMessage()
            .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string"));
    }

    @Test(groups = {"singleCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
    public void invalidChar_Feed()
        throws JAXBException, IOException, URISyntaxException, AuthenticationException,
        OozieClientException {

        String feed = submitAndScheduleFeed(processBundle);
        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);

        //update frequency
        Frequency f = new Frequency("" + 21, Frequency.TimeUnit.minutes);
        String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
        ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed, "abc", null);
        Assert.assertTrue(r.getMessage()
            .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string"));
    }

    @Test(groups = {"singleCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
    public void updateTimeInPast_Process()
        throws JAXBException, IOException, URISyntaxException,
        OozieClientException, AuthenticationException {

        processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(0),
                TimeUtil.getTimeWrtSystemTime(20));
        processBundle.submitFeedsScheduleProcess(prism);

        //get old process details
        String oldProcess = processBundle.getProcessData();
        String oldBundleId = InstanceUtil.getLatestBundleID(cluster1,
            Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);

        InstanceUtil.waitTillInstancesAreCreated(cluster1, oldProcess, 0);
        List<String> initialNominalTimes = OozieUtil.getActionsNominalTime(cluster1,
            oldBundleId, EntityType.PROCESS);

        // update process by adding property
        processBundle.setProcessProperty("someProp", "someValue");
        ServiceResponse r = prism.getProcessHelper().update(oldProcess,
            processBundle.getProcessData(), TimeUtil.getTimeWrtSystemTime(-10000), null);
        AssertUtil.assertSucceeded(r);

        //check new coord created with current time
        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleId, initialNominalTimes,
            processBundle.getProcessData(), true, false);
        InstanceUtil.waitTillInstancesAreCreated(cluster1, oldProcess, 1);
        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleId, initialNominalTimes,
            processBundle.getProcessData(), true, true);
    }

    @Test(groups = {"MultiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
    public void updateTimeInPast_Feed()
        throws JAXBException, IOException, OozieClientException,
        URISyntaxException, AuthenticationException {

        String startTimeCluster_source = TimeUtil.getTimeWrtSystemTime(-10);
        String startTimeCluster_target = TimeUtil.getTimeWrtSystemTime(10);
        String feed = getMultiClusterFeed(startTimeCluster_source, startTimeCluster_target);
        logger.info("feed: " + Util.prettyPrintXml(feed));

        //submit and schedule feed
        ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
        TimeUtil.sleepSeconds(10);
        AssertUtil.assertSucceeded(r);
        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);

        //update frequency
        Frequency f = new Frequency("" + 7, Frequency.TimeUnit.minutes);
        String updatedFeed = InstanceUtil.setFeedFrequency(feed, f);
        r = prism.getFeedHelper().update(feed, updatedFeed,
            TimeUtil.getTimeWrtSystemTime(-10000), null);
        AssertUtil.assertSucceeded(r);
        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1);

        //check correct number of coord exists or not
        Assert.assertEquals(InstanceUtil
            .checkIfFeedCoordExist(cluster1.getFeedHelper(),
                Util.readEntityName(feed),
                "REPLICATION"), 2);
        Assert.assertEquals(InstanceUtil
            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
                "RETENTION"), 2);
        Assert.assertEquals(InstanceUtil
            .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed),
                "RETENTION"), 2);
        Assert.assertEquals(InstanceUtil
            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
                "RETENTION"), 2);
    }

    @Test(groups = {"MultiCluster", "0.3.1", "distributed"}, timeOut = 1200000, enabled = true)
    public void inNextFewMinutesUpdate_RollForward_Process()
        throws JAXBException, IOException, URISyntaxException, JSchException,
        OozieClientException, SAXException, AuthenticationException {
        /*
        submit process on 3 clusters. Schedule on 2 clusters. Bring down one of
        the scheduled cluster. Update with time 5 minutes from now. On running
        cluster new coord should be created with start time +5 and no instance
        should be missing. On 3rd cluster where process was only submit,
        definition should be updated. Bring the down cluster up. Update with same
        definition again, now the recently up cluster should also have new
        coords.
        */
        try {
            Util.startService(cluster2.getProcessHelper());
            String startTime = TimeUtil.getTimeWrtSystemTime(-15);
            processBundle.setProcessValidity(startTime,
                TimeUtil.getTimeWrtSystemTime(60));
            processBundle.addClusterToBundle(bundles[1].getClusters().get(0),
                ClusterType.SOURCE, null, null);
            processBundle.addClusterToBundle(bundles[2].getClusters().get(0),
                ClusterType.SOURCE, null, null);
            processBundle.submitBundle(prism);

            //schedule of 2 cluster
            cluster1.getProcessHelper().schedule(processBundle.getProcessData());
            cluster2.getProcessHelper().schedule(processBundle.getProcessData());
            InstanceUtil.waitTillInstancesAreCreated(cluster2, processBundle.getProcessData(), 0);

            //shut down cluster2
            Util.shutDownService(cluster2.getProcessHelper());

            // save old data before update
            String oldProcess = processBundle.getProcessData();
            String oldBundleID_cluster1 = InstanceUtil
                .getLatestBundleID(cluster1,
                    Util.readEntityName(oldProcess), EntityType.PROCESS);
            String oldBundleID_cluster2 = InstanceUtil
                .getLatestBundleID(cluster2,
                    Util.readEntityName(oldProcess), EntityType.PROCESS);
            List<String> oldNominalTimes_cluster1 = OozieUtil.getActionsNominalTime
                (cluster1,
                    oldBundleID_cluster1, EntityType.PROCESS);
            List<String> oldNominalTimes_cluster2 = OozieUtil.getActionsNominalTime
                (cluster2,
                    oldBundleID_cluster2, EntityType.PROCESS);

            //update process validity
            processBundle.setProcessProperty("someProp", "someValue");

            //send update request
            String updateTime = TimeUtil.getTimeWrtSystemTime(5);
            ServiceResponse r = prism.getProcessHelper()
                .update(oldProcess, processBundle.getProcessData(), updateTime);
            AssertUtil.assertPartial(r);
            InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1);

            //verify new bundle on cluster1 and definition on cluster3
            OozieUtil
                .verifyNewBundleCreation(cluster1, oldBundleID_cluster1, oldNominalTimes_cluster1,
                    oldProcess, true, false);
            OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2,
                oldNominalTimes_cluster2,
                oldProcess, false, false);
            String definition_cluster_3 = Util.getEntityDefinition(cluster3,
                processBundle.getProcessData(), true);
            Assert.assertTrue(XmlUtil.isIdentical(definition_cluster_3,
                processBundle.getProcessData()), "Process definitions should be equal");

            //start the stopped cluster2
            Util.startService(cluster2.getProcessHelper());
            TimeUtil.sleepSeconds(40);
            String newBundleID_cluster1 = InstanceUtil.getLatestBundleID(cluster1,
                Util.readEntityName(oldProcess), EntityType.PROCESS);

            //send second update request
            r = prism.getProcessHelper().update(oldProcess, processBundle.getProcessData(),
                updateTime, null);
            AssertUtil.assertSucceeded(r);
            String def_cluster_2 = Util.getEntityDefinition(cluster2,
                processBundle.getProcessData(), true);
            logger.info("def_cluster_2 : " + Util.prettyPrintXml(def_cluster_2));

            // verify new bundle in cluster2 and no new bundle in cluster1  and
            OozieUtil.verifyNewBundleCreation(cluster1, newBundleID_cluster1,
                oldNominalTimes_cluster1, oldProcess, false, false);
            OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2,
                oldNominalTimes_cluster2, oldProcess, true, false);

            //wait till update time is reached
            TimeUtil.sleepTill(updateTime);
            OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2,
                oldNominalTimes_cluster2, oldProcess, true, true);
            OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID_cluster1,
                oldNominalTimes_cluster1, oldProcess, true, true);
        } finally {
            Util.restartService(cluster2.getProcessHelper());
        }
    }

    @Test(groups = {"MultiCluster", "0.3.1", "distributed"}, timeOut = 1200000, enabled = true)
    public void inNextFewMinutesUpdate_RollForward_Feed()
        throws JAXBException, IOException, URISyntaxException, JSchException, 
        OozieClientException, SAXException, AuthenticationException {
        try {
            String startTimeCluster_source = TimeUtil.getTimeWrtSystemTime(-18);
            String feed = getMultiClusterFeed(startTimeCluster_source, startTimeCluster_source);
            logger.info("feed: " + Util.prettyPrintXml(feed));

            //submit feed on all 3 clusters
            ServiceResponse r = prism.getFeedHelper().submitEntity(feed);
            AssertUtil.assertSucceeded(r);

            //schedule feed of cluster1 and cluster2
            r = cluster1.getFeedHelper().schedule(feed);
            AssertUtil.assertSucceeded(r);
            r = cluster2.getFeedHelper().schedule(feed);
            AssertUtil.assertSucceeded(r);
            InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);

            //shutdown cluster2
            Util.shutDownService(cluster2.getProcessHelper());

            //add some property to feed so that new bundle is created
            String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal");

            //save old data
            String oldBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster1,
                Util.readEntityName(feed), EntityType.FEED);
            List<String> oldNominalTimes_cluster1 = OozieUtil.getActionsNominalTime
                (cluster1, oldBundle_cluster1, EntityType.FEED);

            //send update command with +5 mins in future
            String updateTime = TimeUtil.getTimeWrtSystemTime(5);
            r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
            AssertUtil.assertPartial(r);

            //verify new bundle creation on cluster1 and new definition on cluster3
            OozieUtil.verifyNewBundleCreation(cluster1, oldBundle_cluster1,
                oldNominalTimes_cluster1, feed, true, false);
            String definition = Util.getEntityDefinition(cluster3, feed, true);
            Diff diff = XMLUnit.compareXML(definition, processBundle.getProcessData());
            logger.info(diff);

            //start stopped cluster2
            Util.startService(cluster2.getProcessHelper());
            String newBundle_cluster1 = InstanceUtil.getLatestBundleID(cluster1,
                Util.readEntityName(feed), EntityType.FEED);

            //send update again
            r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
            AssertUtil.assertSucceeded(r);

            //verify new bundle creation on cluster2 and no new bundle on cluster1
            Assert.assertEquals(InstanceUtil
                .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
                    "RETENTION"), 2);
            OozieUtil.verifyNewBundleCreation(cluster1, newBundle_cluster1,
                oldNominalTimes_cluster1, feed, false, false);

            //wait till update time is reached
            TimeUtil.sleepTill(TimeUtil.getTimeWrtSystemTime(5));

            //verify new bundle creation with instance matching
            OozieUtil.verifyNewBundleCreation(cluster1, oldBundle_cluster1,
                oldNominalTimes_cluster1, feed, true, true);
        } finally {
            Util.restartService(cluster2.getProcessHelper());
        }
    }

    @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
    public void updateTimeAfterEndTime_Process()
        throws JAXBException, InterruptedException, IOException, URISyntaxException,
        OozieClientException, AuthenticationException {
        /* submit and schedule process with end time after 60 mins. Set update time
           as with +60 from start mins */
        logger.info("Running test updateTimeAfterEndTime_Process");
        String startTime = TimeUtil.getTimeWrtSystemTime(-15);
        String endTime = TimeUtil.getTimeWrtSystemTime(60);
        processBundle.setProcessValidity(startTime, endTime);
        processBundle.submitFeedsScheduleProcess(prism);
        TimeUtil.sleepSeconds(10);
        InstanceUtil.waitTillInstanceReachState(serverOC.get(0),
            Util.readEntityName(processBundle.getProcessData()), 0,
            CoordinatorAction.Status.WAITING, EntityType.PROCESS);

        //save old data
        String oldProcess = processBundle.getProcessData();
        String oldBundleID = InstanceUtil
            .getLatestBundleID(cluster1,
                Util.readEntityName(oldProcess), EntityType.PROCESS);
        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, oldBundleID,
            EntityType.PROCESS);

        //update
        processBundle.setProcessProperty("someProp", "someVal");
        String updateTime = TimeUtil.addMinsToTime(endTime, 60);
        logger.info("Original Feed : " + Util.prettyPrintXml(oldProcess));
        logger.info("Updated Feed :" + Util.prettyPrintXml(processBundle.getProcessData()));
        logger.info("Update Time : " + updateTime);
        ServiceResponse r = prism.getProcessHelper().update(oldProcess,
            processBundle.getProcessData(), updateTime, null);
        AssertUtil.assertSucceeded(r);

        //verify new bundle creation with instances matching
        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
            oldProcess, true, false);
        InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1);
        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
            oldProcess, true, true);
    }

    @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
    public void updateTimeAfterEndTime_Feed()
        throws JAXBException, IOException, OozieClientException,
        URISyntaxException, AuthenticationException {

        /* submit and schedule feed with end time 60 mins in future and update with +60 in future*/
        String startTime = TimeUtil.getTimeWrtSystemTime(-15);
        String endTime = TimeUtil.getTimeWrtSystemTime(60);

        String feed = processBundle.getDataSets().get(0);
        feed = InstanceUtil.setFeedCluster(feed,
            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
            XmlUtil.createRtention("days(100000)", ActionType.DELETE), null,
            ClusterType.SOURCE, null);
        feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime, endTime),
            XmlUtil.createRtention("days(100000)", ActionType.DELETE),
            Util.readEntityName(processBundle.getClusters().get(0)), ClusterType.SOURCE,
            null, baseTestDir + "/replication" + MINUTE_DATE_PATTERN);

        ServiceResponse r = prism.getClusterHelper().submitEntity(
            processBundle.getClusters().get(0));
        AssertUtil.assertSucceeded(r);
        r = prism.getFeedHelper().submitAndSchedule(feed);
        AssertUtil.assertSucceeded(r);
        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0);

        //save old data
        String oldBundleID = InstanceUtil.getLatestBundleID(cluster1,
            Util.readEntityName(feed), EntityType.FEED);
        String updateTime = TimeUtil.addMinsToTime(endTime, 60);
        String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal");
        logger.info("Original Feed : " + Util.prettyPrintXml(feed));
        logger.info("Updated Feed :" + Util.prettyPrintXml(updatedFeed));
        logger.info("Update Time : " + updateTime);
        r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
        AssertUtil.assertSucceeded(r);
        InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1);

        //verify new bundle creation
        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, null, feed, true, false);
    }

    @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true)
    public void updateTimeBeforeStartTime_Process() throws JAXBException, IOException,
        URISyntaxException, OozieClientException, AuthenticationException {

        /* submit and schedule process with start time +10 mins from now. Update with start time
        -4 and update time +2 mins */
        String startTime = TimeUtil.getTimeWrtSystemTime(10);
        String endTime = TimeUtil.getTimeWrtSystemTime(20);
        processBundle.setProcessValidity(startTime, endTime);
        processBundle.submitFeedsScheduleProcess(prism);

        //save old data
        String oldProcess = processBundle.getProcessData();
        String oldBundleID = InstanceUtil.getLatestBundleID(cluster1,
            Util.readEntityName(oldProcess), EntityType.PROCESS);
        List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, oldBundleID,
            EntityType.PROCESS);
        processBundle.setProcessValidity(TimeUtil.addMinsToTime(startTime, -4), endTime);
        String updateTime = TimeUtil.getTimeWrtSystemTime(2);
        ServiceResponse r = prism.getProcessHelper().update(oldProcess,
            processBundle.getProcessData(), updateTime, null);
        AssertUtil.assertSucceeded(r);
        TimeUtil.sleepSeconds(10);

        //verify new bundle creation
        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes,
            oldProcess, true, false);
    }

    @Test(groups = {"MultiCluster", "0.3.1"}, timeOut = 1200000, enabled = true)
    public void updateDiffClusterDiffValidity_Process()
        throws JAXBException, IOException, URISyntaxException, OozieClientException,
        AuthenticationException {

        //set start end process time for 3 clusters
        String startTime_cluster1 = TimeUtil.getTimeWrtSystemTime(-40);
        String endTime_cluster1 = TimeUtil.getTimeWrtSystemTime(3);
        String startTime_cluster2 = TimeUtil.getTimeWrtSystemTime(120);
        String endTime_cluster2 = TimeUtil.getTimeWrtSystemTime(240);
        String startTime_cluster3 = TimeUtil.getTimeWrtSystemTime(-30);
        String endTime_cluster3 = TimeUtil.getTimeWrtSystemTime(180);

        //create multi cluster bundle
        processBundle.setProcessValidity(startTime_cluster1, endTime_cluster1);
        processBundle.addClusterToBundle(bundles[1].getClusters().get(0),
            ClusterType.SOURCE, startTime_cluster2, endTime_cluster2);
        processBundle.addClusterToBundle(bundles[2].getClusters().get(0),
            ClusterType.SOURCE, startTime_cluster3, endTime_cluster3);

        //submit and schedule
        processBundle.submitFeedsScheduleProcess(prism);

        //wait for coord to be in running state
        InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
        InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 0);

        //save old info
        String oldBundleID_cluster1 = InstanceUtil.getLatestBundleID(cluster1,
            Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
        List<String> nominalTimes_cluster1 = OozieUtil.getActionsNominalTime(cluster1,
            oldBundleID_cluster1, EntityType.PROCESS);
        String oldBundleID_cluster2 = InstanceUtil.getLatestBundleID(cluster2,
            Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
        String oldBundleID_cluster3 = InstanceUtil.getLatestBundleID(cluster3,
            Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS);
        List<String> nominalTimes_cluster3 = OozieUtil.getActionsNominalTime(cluster3,
            oldBundleID_cluster3, EntityType.PROCESS);

        //update process
        String updateTime = TimeUtil.addMinsToTime(endTime_cluster1, 3);
        processBundle.setProcessProperty("someProp", "someVal");
        ServiceResponse r = prism.getProcessHelper().update(processBundle.getProcessData(),
            processBundle.getProcessData(), updateTime, null);
        AssertUtil.assertSucceeded(r);

        //check for new bundle to be created
        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID_cluster1,
            nominalTimes_cluster1, processBundle.getProcessData(), true, false);
        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleID_cluster3,
            nominalTimes_cluster3, processBundle.getProcessData(), true, false);
        OozieUtil.verifyNewBundleCreation(cluster2, oldBundleID_cluster2,
            nominalTimes_cluster3, processBundle.getProcessData(), true, false);

        //wait till new coord are running on cluster1
        InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1);
        OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID_cluster1,
            nominalTimes_cluster1, processBundle.getProcessData(), true, true);

        //verify
        String coordStartTime_cluster3 = OozieUtil.getCoordStartTime(cluster3,
            processBundle.getProcessData(), 1);
        String coordStartTime_cluster2 = OozieUtil.getCoordStartTime(cluster2,
            processBundle.getProcessData(), 1);

        DateTime updateTimeOozie = TimeUtil.oozieDateToDate(updateTime);
        Assert.assertTrue(TimeUtil.oozieDateToDate(coordStartTime_cluster3).isAfter(updateTimeOozie)
            || TimeUtil.oozieDateToDate(coordStartTime_cluster3).isEqual(updateTimeOozie),
            "new coord start time is not correct");
        Assert.assertFalse(
            TimeUtil.oozieDateToDate(coordStartTime_cluster2).isEqual(updateTimeOozie),
            "new coord start time is not correct");
        TimeUtil.sleepTill(updateTime);
        InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 1);

        //verify that no instance are missing
        OozieUtil.verifyNewBundleCreation(cluster3, oldBundleID_cluster3,
            nominalTimes_cluster3, processBundle.getProcessData(), true, true);
    }

    private String submitAndScheduleFeed(Bundle b)
        throws JAXBException, IOException, URISyntaxException, AuthenticationException {
        String feed = b.getDataSets().get(0);
        feed = InstanceUtil.setFeedCluster(feed,
            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
            XmlUtil.createRtention("days(1000000)", ActionType.DELETE), null,
            ClusterType.SOURCE, null);
        feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity
                ("2012-10-01T12:10Z", "2099-10-01T12:10Z"),
            XmlUtil.createRtention("days(1000000)", ActionType.DELETE),
            Util.readEntityName(b.getClusters().get(0)), ClusterType.SOURCE, "",
            "/someTestPath" + MINUTE_DATE_PATTERN);
        ServiceResponse r = prism.getClusterHelper().submitEntity(
            b.getClusters().get(0));
        AssertUtil.assertSucceeded(r);
        r = prism.getFeedHelper().submitAndSchedule(feed);
        AssertUtil.assertSucceeded(r);
        return feed;
    }

    private String getMultiClusterFeed(String startTimeCluster_source,
                                       String startTimeCluster_target)
        throws IOException, URISyntaxException, AuthenticationException {
        String testDataDir = baseTestDir + "/replication";

        //create desired feed
        String feed = bundles[0].getDataSets().get(0);

        //cluster1 is target, cluster2 is source and cluster3 is neutral
        feed = InstanceUtil.setFeedCluster(feed,
            XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
            XmlUtil.createRtention("days(100000)", ActionType.DELETE), null,
            ClusterType.SOURCE, null);
        feed = InstanceUtil.setFeedCluster(feed,
            XmlUtil.createValidity(startTimeCluster_source, "2099-10-01T12:10Z"),
            XmlUtil.createRtention("days(100000)", ActionType.DELETE),
            Util.readEntityName(bundles[2].getClusters().get(0)), null, null);
        feed = InstanceUtil.setFeedCluster(feed,
            XmlUtil.createValidity(startTimeCluster_target, "2099-10-01T12:25Z"),
            XmlUtil.createRtention("days(100000)", ActionType.DELETE),
            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
            null, testDataDir + MINUTE_DATE_PATTERN);
        feed = InstanceUtil.setFeedCluster(feed,
            XmlUtil.createValidity(startTimeCluster_source, "2099-01-01T00:00Z"),
            XmlUtil.createRtention("days(100000)", ActionType.DELETE),
            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
            null, testDataDir + MINUTE_DATE_PATTERN);

        //submit clusters
        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);

        //create test data on cluster2
        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startTimeCluster_source,
            TimeUtil.getTimeWrtSystemTime(60), 1);
        HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE,
            testDataDir + "/", dataDates);
        return feed;
    }

    @AfterMethod(alwaysRun = true)
    public void tearDown(Method method) {
        logger.info("tearDown " + method.getName());
        processBundle.deleteBundle(prism);
        bundles[0].deleteBundle(prism);
        processBundle.deleteBundle(prism);
    }

    @AfterClass(alwaysRun = true)
    public void tearDownClass() throws IOException {
        cleanTestDirs();
    }
}
