blob: 0f0b8590e4d7075aefb6a38e438d1a595dfbe3a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.QueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.UUIDService.ApplicationType;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.WorkflowStoreService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.workflow.WorkflowApp;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.WorkflowLib;
import org.apache.oozie.workflow.lite.EndNodeDef;
import org.apache.oozie.workflow.lite.LiteWorkflowApp;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
import org.apache.oozie.workflow.lite.StartNodeDef;
import java.util.Date;
public class TestPurgeXCommand extends XDataTestCase {
private JPAService jpaService;
private Services services;
private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
"org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
"org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
private static final int TEST_CHILD_NUM = 5;
private static final int LIMIT_3_ITEMS = 3;
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
setClassesToBeExcluded(services.getConf(), excludedServices);
services.init();
jpaService = Services.get().get(JPAService.class);
}
@Override
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}
/**
* Test : purge succeeded wf job and action successfully. Creates and runs a
* new job to completion. Attempts to purge jobs older than a day. Verifies
* the presence of the job in the system. </p> Sets the end date for the
* same job to make it qualify for the purge criteria. Calls the purge
* command, and ensure the job does not exist in the system.
*
* @throws Exception if cannot insert records to the database
*/
public void testSucceededWorkflow() throws Exception {
WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.OK);
purgeWithDefaultParameters();
assertWorkflowJobPurged(job);
assertWorkflowActionPurged(action);
}
/**
* Test : purge failed wf job and action successfully
*
* @throws Exception if cannot insert records to the database
*/
public void testFailedWorkflow() throws Exception {
WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.FAILED);
purgeWithDefaultParameters();
assertWorkflowJobPurged(job);
assertWorkflowActionPurged(action);
}
/**
* Test : purge killed wf job and action successfully
*
* @throws Exception if cannot insert records to the database
*/
public void testKilledWorkflow() throws Exception {
WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.KILLED);
purgeWithDefaultParameters();
assertWorkflowJobPurged(job);
assertWorkflowActionPurged(action);
}
/**
* Test : purge wf job and action still running
*
* @throws Exception if cannot insert records to the database
*/
public void testRunningWorkflow() throws Exception {
WorkflowJobBean job = addRecordToWfJobTableForNegCase(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.RUNNING);
purgeWithDefaultParameters();
assertWorkflowJobNotPurged(job);
assertWorkflowActionNotPurged(action);
}
/**
* Test : purge succeeded coord job and action successfully
*
* @throws Exception if cannot insert records to the database
*/
public void testSucceededCoordinator() throws Exception {
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", 0);
purgeWithDefaultParameters();
assertCoordinatorJobPurged(job);
assertCoordinatorActionPurged(action);
}
/**
* Test : purge failed coord job and action successfully
*
* @throws Exception if cannot insert records to the database
*/
public void testFailedCoordinator() throws Exception {
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.FAILED, false, false);
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.FAILED,
"coord-action-get.xml", 0);
purgeWithDefaultParameters();
assertCoordinatorJobPurged(job);
assertCoordinatorActionPurged(action);
}
/**
* Test : purge killed coord job and action successfully
*
* @throws Exception if cannot insert records to the database
*/
public void testKilledCoordinator() throws Exception {
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.KILLED,
"coord-action-get.xml", 0);
purgeWithDefaultParameters();
assertCoordinatorJobPurged(job);
assertCoordinatorActionPurged(action);
}
/**
* Test : purge coord job and action still running
*
* @throws Exception if cannot insert records to the database
*/
public void testRunningCoordinator() throws Exception {
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING,
"coord-action-get.xml", 0);
purgeWithDefaultParameters();
assertCoordinatorJobNotPurged(job);
assertCoordinatorActionNotPurged(action);
}
/**
* Test : purge succeeded bundle job and action successfully
*
* @throws Exception if cannot insert records to the database
*/
public void testSucceededBundle() throws Exception {
BundleJobBean job = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ(
"2011-01-01T01:00Z"));
CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob1.setAppName("action1");
CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob2.setAppName("action2");
BundleActionBean bundleAction1 = addRecordToBundleActionTable(job.getId(), coordJob1.getAppName(), 0,
Job.Status.SUCCEEDED);
BundleActionBean bundleAction2 = addRecordToBundleActionTable(job.getId(), coordJob2.getAppName(), 0,
Job.Status.SUCCEEDED);
purgeWithDefaultParameters();
assertBundleJobPurged(job);
assertBundleActionPurged(bundleAction1);
assertBundleActionPurged(bundleAction2);
assertCoordinatorJobPurged(coordJob1);
assertCoordinatorJobPurged(coordJob2);
}
/**
* Test : purge failed bundle job and action successfully
*
* @throws Exception if cannot insert records to the database
*/
public void testFailedBundle() throws Exception {
BundleJobBean job = addRecordToBundleJobTable(Job.Status.DONEWITHERROR, DateUtils.parseDateOozieTZ(
"2011-01-01T01:00Z"));
CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, false, false);
coordJob1.setAppName("action1");
CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob2.setAppName("action2");
BundleActionBean bundleAction1 = addRecordToBundleActionTable(job.getId(), coordJob1.getAppName(), 0, Job.Status.FAILED);
BundleActionBean bundleAction2 = addRecordToBundleActionTable(job.getId(), coordJob2.getAppName(), 0,
Job.Status.SUCCEEDED);
purgeWithDefaultParameters();
assertBundleJobPurged(job);
assertBundleActionPurged(bundleAction1);
assertBundleActionPurged(bundleAction2);
assertCoordinatorJobPurged(coordJob1);
assertCoordinatorJobPurged(coordJob2);
}
/**
* Test : purge killed bundle job and action successfully
*
* @throws Exception if cannot insert records to the database
*/
public void testKilledBundle() throws Exception {
BundleJobBean job = addRecordToBundleJobTable(Job.Status.KILLED, DateUtils.parseDateOozieTZ(
"2011-01-01T01:00Z"));
CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
coordJob1.setAppName("action1");
CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
coordJob2.setAppName("action2");
BundleActionBean bundleAction1 = addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.KILLED);
BundleActionBean bundleAction2 = addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.KILLED);
purgeWithDefaultParameters();
assertBundleJobPurged(job);
assertBundleActionPurged(bundleAction1);
assertBundleActionPurged(bundleAction2);
assertCoordinatorJobPurged(coordJob1);
assertCoordinatorJobPurged(coordJob2);
}
/**
* Test : purge bundle job and action still running
*
* @throws Exception if cannot insert records to the database
*/
public void testRunningBundle() throws Exception {
BundleJobBean job = addRecordToBundleJobTable(Job.Status.RUNNING, DateUtils.parseDateOozieTZ(
"2011-01-01T01:00Z"));
CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
coordJob1.setAppName("action1");
CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJob2.setAppName("action2");
BundleActionBean bundleActionBean1 = addRecordToBundleActionTable(job.getId(), "action1", 0, Job.Status.RUNNING);
BundleActionBean bundleActionBean2 = addRecordToBundleActionTable(job.getId(), "action2", 0, Job.Status.SUCCEEDED);
purgeWithDefaultParameters();
assertBundleJobNotPurged(job);
assertBundleActionNotPurged(bundleActionBean1);
assertBundleActionNotPurged(bundleActionBean2);
}
/**
* Test : The workflow should get purged, but the coordinator parent shouldn't get purged --> neither will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testUnpurgeableCoordinatorPurgeableWorkflow() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
purgeWithSpecialParameters(coordJob);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionNotPurged(coordAction);
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
}
/**
* Test : The workflow should get purged, but the coordinator parent shouldn't get purged -->
* the workflow and corresponding coord actions will get purged after we turn the purge.old.coord.action on
* Coordinator itself will not be purged
*
* @throws Exception if cannot insert records to the database
*/
public void testUnpurgeableCoordinatorPurgeableWorkflowWithPurgeOldCoordActionTurnedOn() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
purgeWithSpecialParametersAndAlsoPurgeCoordActions(coordJob);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionPurged(coordAction);
assertWorkflowJobPurged(wfJob);
assertWorkflowActionPurged(wfAction);
}
/**
* Test : The workflow should get purged, but the coordinator parent shouldn't get purged -->
* the workflow and corresponding coord actions will NOT get purged after we turn the purge.old.coord.action off
* Neither will be purged
*
* @throws Exception if cannot insert records to the database
*/
public void testUnpurgeableCoordinatorPurgeableWorkflowWithPurgeOldCoordActionTurnedOff() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
purgeWithSpecialParameters(coordJob);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionNotPurged(coordAction);
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
}
/**
* Test : The workflow should not get purged, but the coordinator parent should get purged --> neither will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableCoordinatorUnpurgeableWorkflow() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
purgeWithSpecialParameters(wfJob);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionNotPurged(coordAction);
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
}
/**
* Test : The workflows should not get purged, but the coordinator parent should get purged --> none will get purged
* There are more workflow children than the limit
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableCoordinatorOverTheLimitUnpurgeableWorkflows() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean[] wfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
CoordinatorActionBean[] coordActions = new CoordinatorActionBean[TEST_CHILD_NUM];
for (int i=0; i<TEST_CHILD_NUM; ++i) {
wfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
wfActions[i] = addRecordToWfActionTable(wfJobs[i].getId(), "1", WorkflowAction.Status.OK);
coordActions[i] = addRecordToCoordActionTable(coordJob.getId(), i, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJobs[i].getId(), "SUCCEEDED", 0);
}
purgeWithSpecialParameters(LIMIT_3_ITEMS, wfJobs[0]);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionsNotPurged(coordActions);
assertWorkflowJobsNotPurged(wfJobs);
assertWorkflowActionsNotPurged(wfActions);
}
/**
* Test : The workflow should get purged, and the coordinator parent should get purged --> both will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableCoordinatorPurgeableWorkflow() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
purgeWithDefaultParameters();
assertCoordinatorJobPurged(coordJob);
assertCoordinatorActionPurged(coordAction);
assertWorkflowJobPurged(wfJob);
assertWorkflowActionPurged(wfAction);
}
/**
* Test : The workflow should get purged, and the coordinator parent should get purged --> both will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableCoordinatorMultiplePurgeableWorkflows() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean[] wfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
CoordinatorActionBean[] coordActions = new CoordinatorActionBean[TEST_CHILD_NUM];
for (int i=0; i<TEST_CHILD_NUM; ++i) {
wfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
wfActions[i] = addRecordToWfActionTable(wfJobs[i].getId(), "1", WorkflowAction.Status.OK);
coordActions[i] = addRecordToCoordActionTable(coordJob.getId(), i, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJobs[i].getId(), "SUCCEEDED", 0);
}
purgeWithDefaultParameters();
assertCoordinatorJobPurged(coordJob);
assertCoordinatorActionsPurged(coordActions);
assertWorkflowJobsPurged(wfJobs);
assertWorkflowActionsPurged(wfActions);
}
/**
* Test : The workflow and coordinator should get purged, but the bundle parent shouldn't get purged --> none will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testUnpurgeableBundlePurgeableCoordinatorPurgeableWorkflow() throws Exception {
BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
Job.Status.SUCCEEDED);
purgeWithSpecialParameters(bundleJob);
assertBundleJobNotPurged(bundleJob);
assertBundleActionNotPurged(bundleAction);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionNotPurged(coordAction);
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
}
/**
* Test : The workflow and coordinator should not get purged, but the bundle parent should get purged --> none will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableBundleUnpurgeableCoordinatorUnpurgeableWorkflow() throws Exception {
BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
Job.Status.SUCCEEDED);
purgeWithSpecialParameters(wfJob, coordJob);
assertBundleJobNotPurged(bundleJob);
assertBundleActionNotPurged(bundleAction);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionNotPurged(coordAction);
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
}
/**
* Test : The workflow and coordinator should not get purged, but the bundle parent should get purged --> none will get purged
* There are more coordinator children than the limit
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableBundleAndOverTheLimitUnpurgeableCoordinatorsAndWorkflows() throws Exception {
BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
CoordinatorJobBean[] coordJobs = new CoordinatorJobBean[TEST_CHILD_NUM];
WorkflowJobBean[] wfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
CoordinatorActionBean[] coordActions = new CoordinatorActionBean[TEST_CHILD_NUM];
BundleActionBean[] bundleActions = new BundleActionBean[TEST_CHILD_NUM];
for (int i=0; i<TEST_CHILD_NUM; ++i) {
coordJobs[i] = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJobs[i].setAppName("coord" + i);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJobs[i]);
wfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
wfActions[i] = addRecordToWfActionTable(wfJobs[i].getId(), "1", WorkflowAction.Status.OK);
coordActions[i] = addRecordToCoordActionTable(coordJobs[i].getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJobs[i].getId(), "SUCCEEDED", 0);
bundleActions[i] = addRecordToBundleActionTable(bundleJob.getId(), coordJobs[i].getId(), coordJobs[i].getAppName(),
0, Job.Status.SUCCEEDED);
}
purgeWithSpecialParameters(LIMIT_3_ITEMS, wfJobs[0], coordJobs[0]);
assertBundleJobNotPurged(bundleJob);
assertBundleActionsNotPurged(bundleActions);
assertCoordinatorJobsNotPurged(coordJobs);
assertCoordinatorActionsNotPurged(coordActions);
assertWorkflowJobsNotPurged(wfJobs);
assertWorkflowActionsNotPurged(wfActions);
}
/**
* Test : The workflow and coordinator should get purged, and the bundle parent should get purged --> all will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableBundlePurgeableCoordinatorPurgeableWorkflow() throws Exception {
BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
Job.Status.SUCCEEDED);
purgeWithDefaultParameters();
assertBundleJobPurged(bundleJob);
assertBundleActionPurged(bundleAction);
assertCoordinatorJobPurged(coordJob);
assertCoordinatorActionPurged(coordAction);
assertWorkflowJobPurged(wfJob);
assertWorkflowActionPurged(wfAction);
}
/**
* Test : The workflow and coordinator should get purged, and the bundle parent should get purged --> all will get purged
* There are more coordinator children than the limit
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableBundleOverTheLimitPurgeableCoordinatorsOverTheLimitPurgeableWorkflows() throws Exception {
BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
CoordinatorJobBean[] coordJobs = new CoordinatorJobBean[TEST_CHILD_NUM];
WorkflowJobBean[] wfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
CoordinatorActionBean[] coordActions = new CoordinatorActionBean[TEST_CHILD_NUM];
BundleActionBean[] bundleActions = new BundleActionBean[TEST_CHILD_NUM];
for (int i=0; i<TEST_CHILD_NUM; ++i) {
coordJobs[i] = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
coordJobs[i].setAppName("coord" + i);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJobs[i]);
wfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
wfActions[i] = addRecordToWfActionTable(wfJobs[i].getId(), "1", WorkflowAction.Status.OK);
coordActions[i] = addRecordToCoordActionTable(coordJobs[i].getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJobs[i].getId(), "SUCCEEDED", 0);
bundleActions[i] = addRecordToBundleActionTable(bundleJob.getId(), coordJobs[i].getId(), coordJobs[i].getAppName(),
0, Job.Status.SUCCEEDED);
}
purgeWithDefaultParameters();
assertBundleJobPurged(bundleJob);
assertBundleActionsPurged(bundleActions);
assertCoordinatorJobsPurged(coordJobs);
assertCoordinatorActionsPurged(coordActions);
assertWorkflowJobsPurged(wfJobs);
assertWorkflowActionsPurged(wfActions);
}
/**
* Test : The subworkflow should get purged, but the workflow parent shouldn't get purged --> neither will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testSucceededWorkflowRunningSubWorkflow() throws Exception {
WorkflowJobBean wfJob = addRecordToWfJobTableForNegCase(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
purgeWithDefaultParameters();
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
assertWorkflowJobNotPurged(subwfJob);
assertWorkflowActionNotPurged(subwfAction);
}
/**
* Test : The subworkflow shouldn't get purged, but the workflow parent should get purged --> neither will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testRunningWorkflowSucceededSubWorkflow() throws Exception {
WorkflowJobBean wfJob = addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.RUNNING);
purgeWithDefaultParameters();
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
assertWorkflowJobNotPurged(subwfJob);
assertWorkflowActionNotPurged(subwfAction);
}
/**
* Test : The subworkflows shouldn't get purged, but the workflow parent should get purged --> none will get purged
* There are more subworkflow children than the limit
*
* @throws Exception if cannot insert records to the database
*/
public void testSucceededWorkflowOverTheLimitRunningSubWorkflows() throws Exception {
WorkflowJobBean wfJob = addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
WorkflowJobBean[] subwfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
WorkflowActionBean[] subwfActions = new WorkflowActionBean[TEST_CHILD_NUM];
for (int i=0; i<TEST_CHILD_NUM; ++i) {
wfActions[i] = addRecordToWfActionTable(wfJob.getId(), String.format("action%d",i), WorkflowAction.Status.OK);
subwfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
wfJob.getId());
subwfActions[i] = addRecordToWfActionTable(subwfJobs[i].getId(), String.format("action%d",i),
WorkflowAction.Status.RUNNING);
}
purgeWithDefaultParameters();
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionsNotPurged(wfActions);
assertWorkflowJobsNotPurged(subwfJobs);
assertWorkflowActionsNotPurged(subwfActions);
}
/**
* Test : The subworkflow should get purged, and the workflow parent should get purged --> both will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testSucceededWorkflowSucceededSubWorkflow() throws Exception {
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
purgeWithDefaultParameters();
assertWorkflowJobPurged(wfJob);
assertWorkflowActionPurged(wfAction);
assertWorkflowJobPurged(subwfJob);
assertWorkflowActionPurged(subwfAction);
}
/**
* Test : The subworkflow should get purged, and the workflow parent should get purged --> both will get purged
* There are more subworkflow children than the limit
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableWorkflowOverTheLimitSucceededSubWorkflows() throws Exception {
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean[] wfActions = new WorkflowActionBean[TEST_CHILD_NUM];
WorkflowJobBean[] subwfJobs = new WorkflowJobBean[TEST_CHILD_NUM];
WorkflowActionBean[] subwfActions = new WorkflowActionBean[TEST_CHILD_NUM];
for (int i=0; i<TEST_CHILD_NUM; ++i) {
wfActions[i] = addRecordToWfActionTable(wfJob.getId(), String.format("action%d", i), WorkflowAction.Status.OK);
subwfJobs[i] = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
subwfActions[i] = addRecordToWfActionTable(subwfJobs[i].getId(), "1", WorkflowAction.Status.OK);
}
purgeWithSpecialParameters(LIMIT_3_ITEMS);
assertWorkflowJobPurged(wfJob);
assertWorkflowActionsPurged(wfActions);
assertWorkflowJobsPurged(subwfJobs);
assertWorkflowActionsPurged(subwfActions);
}
/**
* Test : The subsubworkflow shouldn't get purged,
* the subworkflow should get purged,
* the workflow parent should get purged --> neither will get purged
*
* @throws Exception if unable to create workflow job or action bean
*/
public void testPurgeableWorkflowPurgeableSubWorkflowUnpurgeableSubSubWorkflow() throws Exception {
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subsubwfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
subwfJob.getId());
WorkflowActionBean subsubwfAction = addRecordToWfActionTable(subsubwfJob.getId(), "1", WorkflowAction.Status.RUNNING);
purgeWithDefaultParameters();
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
assertWorkflowJobNotPurged(subwfJob);
assertWorkflowActionNotPurged(subwfAction);
assertWorkflowJobNotPurged(subsubwfJob);
assertWorkflowActionNotPurged(subsubwfAction);
}
/**
* Test : The subsubworkflow should get purged,
* the subworkflow shouldn't get purged,
* the workflow parent should get purged --> neither will get purged
*
* @throws Exception if unable to create workflow job or action bean
*/
public void testPurgeableWorkflowUnpurgeableSubWorkflowPurgeableSubSubWorkflow() throws Exception {
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.RUNNING);
WorkflowJobBean subsubwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
subwfJob.getId());
WorkflowActionBean subsubwfAction = addRecordToWfActionTable(subsubwfJob.getId(), "1", WorkflowAction.Status.OK);
purgeWithDefaultParameters();
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
assertWorkflowJobNotPurged(subwfJob);
assertWorkflowActionNotPurged(subwfAction);
assertWorkflowJobNotPurged(subsubwfJob);
assertWorkflowActionNotPurged(subsubwfAction);
}
/**
* Test : The subsubworkflows should get purged,
* the subworkflow should get purged,
* the workflow parent should get purged --> all will get purged
*
* @throws Exception if unable to create workflow job or action bean
*/
public void testPurgeableWorkflowPurgeableSubWorkflowPurgeableSubSubWorkflow() throws Exception {
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subsub1wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
subwfJob.getId());
WorkflowJobBean subsub2wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
subwfJob.getId());
WorkflowActionBean subsub1wfAction = addRecordToWfActionTable(subsub1wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowActionBean subsub2wfAction = addRecordToWfActionTable(subsub2wfJob.getId(), "1", WorkflowAction.Status.OK);
purgeWithDefaultParameters();
assertWorkflowJobPurged(wfJob);
assertWorkflowActionPurged(wfAction);
assertWorkflowJobPurged(subwfJob);
assertWorkflowActionPurged(subwfAction);
assertWorkflowJobPurged(subsub1wfJob);
assertWorkflowJobPurged(subsub2wfJob);
assertWorkflowActionPurged(subsub1wfAction);
assertWorkflowActionPurged(subsub2wfAction);
}
/**
* Test : The subworkflow should get purged, and the workflow parent should get purged --> both will get purged
* Subworkflow has terminated, last modified time is known, but end time is null
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableWorkflowPurgeableSubWorkflowWithNullEndTimeValidLastModifiedTime() throws Exception {
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction1 = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
subwfJob1.setLastModifiedTime(wfJob.getEndTime());
subwfJob1.setEndTime(null);
final QueryExecutor<WorkflowJobBean, WorkflowJobQueryExecutor.WorkflowJobQuery> workflowJobQueryExecutor =
WorkflowJobQueryExecutor.getInstance();
workflowJobQueryExecutor.executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW, subwfJob1);
purgeWithSpecialParameters(LIMIT_3_ITEMS);
assertWorkflowJobPurged(wfJob);
assertWorkflowActionPurged(wfAction1);
assertWorkflowJobPurged(subwfJob1);
}
/**
* Test : The subworkflow and workflow should get purged, but the coordinator parent shouldn't get purged --> none will get
* purged
*
* @throws Exception if cannot insert records to the database
*/
public void testUnpurgeableCoordinatorPurgeableWorkflowPurgeableSubWorkflow() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
purgeWithSpecialParameters(coordJob);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionNotPurged(coordAction);
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
assertWorkflowJobNotPurged(subwfJob);
assertWorkflowActionNotPurged(subwfAction);
}
/**
* Test : The subworkflow and workflow should not get purged, but the coordinator parent should get purged --> none will get
* purged
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableCoordinatorUnpurgeableWorkflowPurgeableSubWorkflow() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
purgeWithSpecialParameters(wfJob);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionNotPurged(coordAction);
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
assertWorkflowJobNotPurged(subwfJob);
assertWorkflowActionNotPurged(subwfAction);
}
/**
* Test : The subworkflow and workflow should get purged, and the coordinator parent should get purged --> all will get
* purged
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableCoordinatorPurgeableWorkflowPurgeableSubWorkflow() throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
purgeWithDefaultParameters();
assertCoordinatorJobPurged(coordJob);
assertCoordinatorActionPurged(coordAction);
assertWorkflowJobPurged(wfJob);
assertWorkflowActionPurged(wfAction);
assertWorkflowJobPurged(subwfJob);
assertWorkflowActionPurged(subwfAction);
}
/**
* Test : The subworkflow and workflow should get purged, and the coordinator parent should get purged --> all will get
* purged
*
* Coordinator parent finished Workflow and its subworkflow have terminated, last modified time is known, but end time is null
* for workflow and subworkflow
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableCoordinatorPurgeableWorkflowPurgeableSubWorkflowWithNullEndTimeValidLastModifiedTime()
throws Exception {
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
final QueryExecutor<WorkflowJobBean, WorkflowJobQueryExecutor.WorkflowJobQuery> workflowJobQueryExecutor =
WorkflowJobQueryExecutor.getInstance();
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
wfJob.setLastModifiedTime(wfJob.getEndTime());
wfJob.setEndTime(null);
workflowJobQueryExecutor.executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW, wfJob);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
subwfJob.setLastModifiedTime(subwfJob.getEndTime());
subwfJob.setEndTime(null);
workflowJobQueryExecutor.executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW, subwfJob);
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
purgeWithDefaultParameters();
assertCoordinatorJobPurged(coordJob);
assertCoordinatorActionPurged(coordAction);
assertWorkflowJobPurged(wfJob);
assertWorkflowActionPurged(wfAction);
assertWorkflowJobPurged(subwfJob);
assertWorkflowActionPurged(subwfAction);
}
/**
* Test : The subworkflow, workflow, and coordinator should get purged, but the bundle parent shouldn't get purged --> none will
* get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testUnpurgeableBundlePurgeableCoordiatorPurgeableWorkflowPurgeableSubWorkflow() throws Exception {
BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
Job.Status.SUCCEEDED);
purgeWithSpecialParameters(bundleJob);
assertBundleJobNotPurged(bundleJob);
assertBundleActionNotPurged(bundleAction);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionNotPurged(coordAction);
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
assertWorkflowJobNotPurged(subwfJob);
assertWorkflowActionNotPurged(subwfAction);
}
/**
* Test : The subworkflow, workflow, and coordinator should not get purged, but the bundle parent should get purged --> none
* will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableBundleUnpurgeableCoordinatorUnpurgebleWorkflowPurgeableSubWorkflow() throws Exception {
BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
Job.Status.SUCCEEDED);
purgeWithSpecialParameters(wfJob, coordJob);
assertBundleJobNotPurged(bundleJob);
assertBundleActionNotPurged(bundleAction);
assertCoordinatorJobNotPurged(coordJob);
assertCoordinatorActionNotPurged(coordAction);
assertWorkflowJobNotPurged(wfJob);
assertWorkflowActionNotPurged(wfAction);
assertWorkflowJobNotPurged(subwfJob);
assertWorkflowActionNotPurged(subwfAction);
}
/**
* Test : The subworkflow, workflow, and coordinator should get purged, and the bundle parent should get purged --> all
* will get purged
*
* @throws Exception if cannot insert records to the database
*/
public void testPurgeableBundlePurgeableCoordinatorPurgeableWorkflowPurgeableSubWorkflow() throws Exception {
BundleJobBean bundleJob = addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ("2011-01-01T01:00Z"));
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJob.getId());
WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
BundleActionBean bundleAction = addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(), coordJob.getAppName(), 0,
Job.Status.SUCCEEDED);
purgeWithDefaultParameters();
assertBundleJobPurged(bundleJob);
assertBundleActionPurged(bundleAction);
assertCoordinatorJobPurged(coordJob);
assertCoordinatorActionPurged(coordAction);
assertWorkflowJobPurged(wfJob);
assertWorkflowActionPurged(wfAction);
assertWorkflowJobPurged(subwfJob);
assertWorkflowActionPurged(subwfAction);
}
/**
* Test : Test purging a lot of jobs (with different parent-child relationships and purge-eligibility) in one go
*
* @throws Exception if cannot insert records to the database
*/
public void testComplexExample() throws Exception {
assertNotNull(jpaService);
/* Job relationships:
bundleJobA yes yes
coordJobA yes ^
wfJobA yes ^
subwfJobA yes ^
bundleJobB no no
coordJobB yes ^
wfJobB no ^
coordJobC no no
wfJobC no ^
subwfJobC yes ^
coordJobD yes no
wfJobD no ^
wfJobE yes yes
wfJobF yes no
subwfJobF no ^
*/
BundleJobBean bundleJobA = addRecordToBundleJobTable(Job.Status.SUCCEEDED,
DateUtils.parseDateOozieTZ("2011-05-01T01:00Z"));
BundleJobBean bundleJobB = addRecordToBundleJobTable(Job.Status.SUCCEEDED,
DateUtils.parseDateOozieTZ("2011-06-01T01:00Z"));
CoordinatorJobBean coordJobA = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
setLastModifiedTime(coordJobA, "2011-03-01T01:00Z");
CoordinatorJobBean coordJobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
setLastModifiedTime(coordJobB, "2011-03-01T01:00Z");
CoordinatorJobBean coordJobC = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
setLastModifiedTime(coordJobC, "2011-07-01T01:00Z");
CoordinatorJobBean coordJobD = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
setLastModifiedTime(coordJobD, "2011-06-01T01:00Z");
WorkflowJobBean wfJobA = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
setEndTime(wfJobA, "2011-02-01T01:00Z");
WorkflowJobBean wfJobB = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
setEndTime(wfJobB, "2011-03-01T01:00Z");
WorkflowJobBean wfJobC = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
setEndTime(wfJobC, "2011-06-01T01:00Z");
WorkflowJobBean wfJobD = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
setEndTime(wfJobD, "2011-06-01T01:00Z");
WorkflowJobBean wfJobE = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
setEndTime(wfJobE, "2011-01-01T01:00Z");
WorkflowJobBean wfJobF = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
setEndTime(wfJobF, "2011-01-01T01:00Z");
WorkflowActionBean wfActionA = addRecordToWfActionTable(wfJobA.getId(), "1", WorkflowAction.Status.OK);
WorkflowActionBean wfActionB = addRecordToWfActionTable(wfJobB.getId(), "1", WorkflowAction.Status.OK);
WorkflowActionBean wfActionC = addRecordToWfActionTable(wfJobC.getId(), "1", WorkflowAction.Status.OK);
WorkflowActionBean wfActionD = addRecordToWfActionTable(wfJobD.getId(), "1", WorkflowAction.Status.OK);
WorkflowActionBean wfActionE = addRecordToWfActionTable(wfJobE.getId(), "1", WorkflowAction.Status.OK);
WorkflowActionBean wfActionF = addRecordToWfActionTable(wfJobF.getId(), "1", WorkflowAction.Status.OK);
WorkflowJobBean subwfJobA = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJobA.getId());
setEndTime(subwfJobA, "2011-01-01T01:00Z");
WorkflowJobBean subwfJobC = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJobC.getId());
setEndTime(subwfJobC, "2011-01-01T01:00Z");
WorkflowJobBean subwfJobF = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
wfJobF.getId());
setEndTime(subwfJobF, "2011-04-01T01:00Z");
WorkflowActionBean subwfActionA = addRecordToWfActionTable(subwfJobA.getId(), "1", WorkflowAction.Status.OK);
WorkflowActionBean subwfActionC = addRecordToWfActionTable(subwfJobC.getId(), "1", WorkflowAction.Status.OK);
WorkflowActionBean subwfActionF = addRecordToWfActionTable(subwfJobF.getId(), "1", WorkflowAction.Status.OK);
CoordinatorActionBean coordActionA = addRecordToCoordActionTable(coordJobA.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJobA.getId(), "SUCCEEDED", 0);
CoordinatorActionBean coordActionB = addRecordToCoordActionTable(coordJobB.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJobB.getId(), "SUCCEEDED", 0);
CoordinatorActionBean coordActionC = addRecordToCoordActionTable(coordJobC.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJobC.getId(), "SUCCEEDED", 0);
CoordinatorActionBean coordActionD = addRecordToCoordActionTable(coordJobD.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", wfJobD.getId(), "SUCCEEDED", 0);
BundleActionBean bundleActionA = addRecordToBundleActionTable(bundleJobA.getId(), coordJobA.getId(), coordJobA.getAppName(),
0,Job.Status.SUCCEEDED);
BundleActionBean bundleActionB = addRecordToBundleActionTable(bundleJobB.getId(), coordJobB.getId(), coordJobB.getAppName(),
0, Job.Status.SUCCEEDED);
purgeWithSpecialParameters(wfJobB, coordJobC, bundleJobB);
assertBundleJobPurged(bundleJobA);
assertBundleActionPurged(bundleActionA);
assertBundleJobNotPurged(bundleJobB);
assertBundleActionNotPurged(bundleActionB);
assertCoordinatorJobPurged(coordJobA);
assertCoordinatorActionPurged(coordActionA);
assertCoordinatorJobNotPurged(coordJobB);
assertCoordinatorActionNotPurged(coordActionB);
assertCoordinatorJobNotPurged(coordJobC);
assertCoordinatorActionNotPurged(coordActionC);
assertCoordinatorJobNotPurged(coordJobD);
assertCoordinatorActionNotPurged(coordActionD);
assertWorkflowJobPurged(wfJobA);
assertWorkflowActionPurged(wfActionA);
assertWorkflowJobNotPurged(wfJobB);
assertWorkflowActionNotPurged(wfActionB);
assertWorkflowJobNotPurged(wfJobC);
assertWorkflowActionNotPurged(wfActionC);
assertWorkflowJobNotPurged(wfJobD);
assertWorkflowActionNotPurged(wfActionD);
assertWorkflowJobPurged(wfJobE);
assertWorkflowActionPurged(wfActionE);
assertWorkflowJobNotPurged(wfJobF);
assertWorkflowActionNotPurged(wfActionF);
assertWorkflowJobPurged(subwfJobA);
assertWorkflowActionPurged(subwfActionA);
assertWorkflowJobNotPurged(subwfJobC);
assertWorkflowActionNotPurged(subwfActionC);
assertWorkflowJobNotPurged(subwfJobF);
assertWorkflowActionNotPurged(subwfActionF);
}
private static class PurgeParameters {
private int workflowDays;
private int coordinatorDays;
private int bundleDays;
private int limit;
private boolean purgeOldCoordAction;
private static PurgeParameters createDefaultParameters() {
PurgeParameters purgeParameters = new PurgeParameters();
purgeParameters.workflowDays = 30;
purgeParameters.coordinatorDays = 7;
purgeParameters.bundleDays = 7;
purgeParameters.limit = 100;
purgeParameters.purgeOldCoordAction = false;
return purgeParameters;
}
private PurgeParameters withPurgeOldCordAction() {
purgeOldCoordAction = true;
return this;
}
private PurgeParameters withKeepingJobs(JsonBean... jsonBeans) {
for (JsonBean jsonBean : jsonBeans) {
if (jsonBean instanceof WorkflowJobBean) {
workflowDays = getNumDaysToNotBePurged(((WorkflowJobBean)jsonBean).getEndTime());
}
else if (jsonBean instanceof CoordinatorJobBean) {
coordinatorDays = getNumDaysToNotBePurged(((CoordinatorJobBean)jsonBean).getLastModifiedTime());
}
else if (jsonBean instanceof BundleJobBean) {
bundleDays = getNumDaysToNotBePurged(((BundleJobBean)jsonBean).getLastModifiedTime());
}
}
return this;
}
private PurgeParameters withLimit(int limit) {
this.limit = limit;
return this;
}
}
private void purgeWithDefaultParameters() throws CommandException {
purgeWithParameters(PurgeParameters.createDefaultParameters());
}
private void purgeWithSpecialParametersAndAlsoPurgeCoordActions(JsonBean... jsonBeans) throws CommandException {
purgeWithParameters(PurgeParameters.createDefaultParameters().withPurgeOldCordAction());
}
private void purgeWithSpecialParameters(JsonBean... jsonBeans) throws CommandException {
purgeWithParameters(PurgeParameters.createDefaultParameters().withKeepingJobs(jsonBeans));
}
private void purgeWithSpecialParameters(int limit, JsonBean... jsonBeans) throws CommandException {
purgeWithParameters(PurgeParameters.createDefaultParameters().withLimit(limit).withKeepingJobs(jsonBeans));
}
private void purgeWithParameters(PurgeParameters purgeParameters) throws CommandException {
new PurgeXCommand(purgeParameters.workflowDays, purgeParameters.coordinatorDays, purgeParameters.bundleDays,
purgeParameters.limit, purgeParameters.purgeOldCoordAction).call();
}
private void assertBundleActionsPurged(BundleActionBean... bundleActionBeans) {
for (BundleActionBean bean : bundleActionBeans) {
assertBundleActionPurged(bean);
}
}
private void assertBundleActionsNotPurged(BundleActionBean... bundleActionBeans) {
for (BundleActionBean bean : bundleActionBeans) {
assertBundleActionNotPurged(bean);
}
}
private void assertCoordinatorJobsPurged(CoordinatorJobBean... coordinatorJobBeans) {
for (CoordinatorJobBean bean : coordinatorJobBeans) {
assertCoordinatorJobPurged(bean);
}
}
private void assertCoordinatorJobsNotPurged(CoordinatorJobBean... coordinatorJobBeans) {
for (CoordinatorJobBean bean : coordinatorJobBeans) {
assertCoordinatorJobNotPurged(bean);
}
}
private void assertCoordinatorActionsPurged(CoordinatorActionBean... coordinatorActionBeans) {
for (CoordinatorActionBean bean : coordinatorActionBeans) {
assertCoordinatorActionPurged(bean);
}
}
private void assertCoordinatorActionsNotPurged(CoordinatorActionBean... coordinatorActionBeans) {
for (CoordinatorActionBean bean : coordinatorActionBeans) {
assertCoordinatorActionNotPurged(bean);
}
}
private void assertWorkflowJobsPurged(WorkflowJobBean... workflowJobBeans) {
for (WorkflowJobBean bean : workflowJobBeans) {
assertWorkflowJobPurged(bean);
}
}
private void assertWorkflowJobsNotPurged(WorkflowJobBean... workflowJobBeans) {
for (WorkflowJobBean bean : workflowJobBeans) {
assertWorkflowJobNotPurged(bean);
}
}
private void assertWorkflowActionsPurged(WorkflowActionBean... workflowActionBeans) {
for (WorkflowActionBean bean : workflowActionBeans) {
assertWorkflowActionPurged(bean);
}
}
private void assertWorkflowActionsNotPurged(WorkflowActionBean... workflowActionBeans) {
for (WorkflowActionBean bean : workflowActionBeans) {
assertWorkflowActionNotPurged(bean);
}
}
private void assertWorkflowJobNotPurged(WorkflowJobBean workflowJobBean) {
try {
WorkflowJobGetJPAExecutor jpaExecutor = new WorkflowJobGetJPAExecutor(workflowJobBean.getId());
jpaService.execute(jpaExecutor);
} catch (JPAExecutorException je) {
fail("Workflow job "+workflowJobBean.getId()+" should not have been purged");
}
}
private void assertWorkflowJobPurged(WorkflowJobBean workflowJobBean) {
try {
WorkflowJobGetJPAExecutor jpaExecutor = new WorkflowJobGetJPAExecutor(workflowJobBean.getId());
jpaService.execute(jpaExecutor);
fail("Workflow job "+workflowJobBean.getId()+" should have been purged");
} catch (JPAExecutorException je) {
assertEquals(ErrorCode.E0604, je.getErrorCode());
}
}
private void assertWorkflowActionNotPurged(WorkflowActionBean workflowActionBean) {
try {
WorkflowActionGetJPAExecutor jpaExecutor = new WorkflowActionGetJPAExecutor(workflowActionBean.getId());
jpaService.execute(jpaExecutor);
} catch (JPAExecutorException je) {
fail("Workflow action "+workflowActionBean.getId()+" should not have been purged");
}
}
private void assertWorkflowActionPurged(WorkflowActionBean workflowActionBean) {
try {
WorkflowActionGetJPAExecutor jpaExecutor = new WorkflowActionGetJPAExecutor(workflowActionBean.getId());
jpaService.execute(jpaExecutor);
fail("Workflow job "+workflowActionBean.getId()+" should have been purged");
} catch (JPAExecutorException je) {
assertEquals(ErrorCode.E0605, je.getErrorCode());
}
}
private void assertCoordinatorJobNotPurged(CoordinatorJobBean coordinatorJobBean) {
try {
CoordJobGetJPAExecutor jpaExecutor = new CoordJobGetJPAExecutor(coordinatorJobBean.getId());
jpaService.execute(jpaExecutor);
} catch (JPAExecutorException je) {
fail("Coordinator job "+ coordinatorJobBean.getId()+" should not have been purged");
}
}
private void assertCoordinatorJobPurged(CoordinatorJobBean coordinatorJobBean) {
try {
CoordJobGetJPAExecutor jpaExecutor = new CoordJobGetJPAExecutor(coordinatorJobBean.getId());
jpaService.execute(jpaExecutor);
fail("Coordinator job "+coordinatorJobBean.getId()+" should have been purged");
} catch (JPAExecutorException je) {
assertEquals(ErrorCode.E0604, je.getErrorCode());
}
}
private void assertCoordinatorActionNotPurged(CoordinatorActionBean coordinatorActionBean) {
try {
CoordActionGetJPAExecutor jpaExecutor = new CoordActionGetJPAExecutor(coordinatorActionBean.getId());
jpaService.execute(jpaExecutor);
} catch (JPAExecutorException je) {
fail("Coordinator action "+coordinatorActionBean.getId()+" should not have been purged");
}
}
private void assertCoordinatorActionPurged(CoordinatorActionBean coordinatorActionBean) {
try {
CoordActionGetJPAExecutor jpaExecutor = new CoordActionGetJPAExecutor(coordinatorActionBean.getId());
jpaService.execute(jpaExecutor);
fail("Coordinator action "+coordinatorActionBean.getId()+" should have been purged");
} catch (JPAExecutorException je) {
assertEquals(ErrorCode.E0605, je.getErrorCode());
}
}
private void assertBundleJobNotPurged(BundleJobBean bundleJobBean) {
try {
BundleJobGetJPAExecutor jpaExecutor = new BundleJobGetJPAExecutor(bundleJobBean.getId());
jpaService.execute(jpaExecutor);
} catch (JPAExecutorException je) {
fail("Bundle job "+bundleJobBean.getId()+" should not have been purged");
}
}
private void assertBundleJobPurged(BundleJobBean bundleJobBean) {
try {
BundleJobGetJPAExecutor jpaExecutor = new BundleJobGetJPAExecutor(bundleJobBean.getId());
jpaService.execute(jpaExecutor);
fail("Bundle job "+bundleJobBean.getId()+" should have been purged");
} catch (JPAExecutorException je) {
assertEquals(ErrorCode.E0604, je.getErrorCode());
}
}
private void assertBundleActionNotPurged(BundleActionBean bundleActionBean) {
try {
BundleActionGetJPAExecutor jpaExecutor = new BundleActionGetJPAExecutor(bundleActionBean.getBundleId(),
bundleActionBean.getCoordName());
jpaService.execute(jpaExecutor);
} catch (JPAExecutorException je) {
fail("Bundle action "+bundleActionBean.getBundleActionId()+" should not have been purged");
}
}
private void assertBundleActionPurged(BundleActionBean bundleActionBean) {
try {
BundleActionGetJPAExecutor jpaExecutor = new BundleActionGetJPAExecutor(bundleActionBean.getBundleId(),
bundleActionBean.getCoordName());
jpaService.execute(jpaExecutor);
fail("Bundle action "+bundleActionBean.getBundleActionId()+" should have been purged");
} catch (JPAExecutorException je) {
assertEquals(ErrorCode.E0605, je.getErrorCode());
}
}
private WorkflowJobBean addRecordToWfJobTableForNegCase(WorkflowJob.Status jobStatus,
WorkflowInstance.Status instanceStatus) throws Exception {
WorkflowApp app =
new LiteWorkflowApp("testApp", "<workflow-app/>",
new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
Configuration conf = new Configuration();
Path appUri = new Path(getAppPath(), "workflow.xml");
conf.set(OozieClient.APP_PATH, appUri.toString());
conf.set(OozieClient.LOG_TOKEN, "testToken");
conf.set(OozieClient.USER_NAME, getTestUser());
WorkflowJobBean wfBean = createWorkflow(app, conf, jobStatus, instanceStatus);
//Set start time to 100 days from now
wfBean.setStartTime(new Date(System.currentTimeMillis() + (long)100*24*60*60*1000));
//Set end time to 100 days + 2 hours from now
wfBean.setEndTime(new Date(System.currentTimeMillis() + (long)100*24*60*60*1000 + (long)2*60*60*1000));
try {
WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(wfBean);
jpaService.execute(wfInsertCmd);
}
catch (JPAExecutorException ce) {
ce.printStackTrace();
fail("Unable to insert the test wf job record to table");
throw ce;
}
return wfBean;
}
@Override
protected WorkflowJobBean createWorkflow(WorkflowApp app, Configuration conf,
WorkflowJob.Status jobStatus, WorkflowInstance.Status instanceStatus) throws Exception {
WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
Configuration protoActionConf = wps.createProtoActionConf(conf, true);
WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
WorkflowInstance wfInstance = workflowLib.createInstance(app, conf);
((LiteWorkflowInstance) wfInstance).setStatus(instanceStatus);
WorkflowJobBean workflow = new WorkflowJobBean();
workflow.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.WORKFLOW));
workflow.setAppName(app.getName());
workflow.setAppPath(conf.get(OozieClient.APP_PATH));
workflow.setConf(XmlUtils.prettyPrint(conf).toString());
workflow.setProtoActionConf(XmlUtils.prettyPrint(protoActionConf).toString());
workflow.setCreatedTime(new Date());
workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
workflow.setStatus(jobStatus);
workflow.setRun(0);
workflow.setUser(conf.get(OozieClient.USER_NAME));
workflow.setWorkflowInstance(wfInstance);
workflow.setStartTime(DateUtils.parseDateOozieTZ("2009-12-18T01:00Z"));
workflow.setEndTime(DateUtils.parseDateOozieTZ("2009-12-18T03:00Z"));
return workflow;
}
@Override
protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, boolean pending, boolean doneMatd)
throws Exception {
CoordinatorJobBean coordJob = createCoordJob(status, pending, doneMatd);
coordJob.setLastModifiedTime(DateUtils.parseDateOozieTZ("2009-12-18T01:00Z"));
try {
CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob);
jpaService.execute(coordInsertCmd);
}
catch (JPAExecutorException je) {
je.printStackTrace();
fail("Unable to insert the test coord job record to table");
throw je;
}
return coordJob;
}
private BundleJobBean addRecordToBundleJobTable(Job.Status jobStatus, Date lastModifiedTime) throws Exception {
BundleJobBean bundle = createBundleJob(jobStatus, false);
bundle.setLastModifiedTime(lastModifiedTime);
try {
BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundle);
jpaService.execute(bundleInsertjpa);
}
catch (JPAExecutorException je) {
je.printStackTrace();
fail("Unable to insert the test bundle job record to table");
throw je;
}
return bundle;
}
/**
* Return a number of days from today such that the job (workflow, coord, or bundle) ending at jobDate will NOT be purged
* @param jobDate The date that the job was last modified
* @return 5 + the number of days from today to jobDate
*/
public static int getNumDaysToNotBePurged(Date jobDate) {
int days = (int) (((new Date()).getTime() - jobDate.getTime()) / 1000 / 60 / 60 / 24);
return days + 5;
}
public static CoordinatorJobBean setLastModifiedTime(CoordinatorJobBean job, String date) throws Exception {
job.setLastModifiedTime(DateUtils.parseDateOozieTZ(date));
try {
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, job);
}
catch (JPAExecutorException je) {
je.printStackTrace();
fail("Unable to update coord job last modified time");
throw je;
}
return job;
}
private static void setEndTime(WorkflowJobBean job, String date) throws Exception {
job.setEndTime(DateUtils.parseDateOozieTZ(date));
try {
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, job);
}
catch (JPAExecutorException je) {
je.printStackTrace();
fail("Unable to update workflow job last modified time");
throw je;
}
}
}