blob: 1fe8d3b79fbe80c7d8cb817d6c7ce44495a2249d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.coord;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.StatusTransitService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.workflow.WorkflowInstance;
public class TestCoordKillXCommand extends XDataTestCase {
private Services services;
private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
"org.apache.oozie.service.PauseTransitService",
"org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
setClassesToBeExcluded(services.getConf(), excludedServices);
services.init();
}
@Override
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}
/**
* Test : kill job and action (READY) successfully
*
* @throws Exception
*/
public void testCoordKillSuccess1() throws Exception {
String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 0);
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.READY,
"coord-action-get.xml", 0);
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action.getId());
job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNING);
assertEquals(action.getStatus(), CoordinatorAction.Status.READY);
assertFalse(job.isDoneMaterialization());
new CoordKillXCommand(job.getId()).call();
job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.KILLED);
assertTrue(job.isDoneMaterialization());
assertNotNull(job.getLastModifiedTime());
assertEquals(action.getStatus(), CoordinatorAction.Status.KILLED);
// Change job status to RUNNINGWITHERROR to simulate StatusTransitService changing it to
// RUNNINGWITHERROR if it had loaded status and had it as RUNNING in memory when CoordKill
// executes and updates status to KILLED in database.
job.setStatus(CoordinatorJob.Status.RUNNINGWITHERROR);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job);
job = jpaService.execute(coordJobGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNINGWITHERROR);
final CoordMaterializeTransitionXCommand transitionCmd = new CoordMaterializeTransitionXCommand(job.getId(), 3600);
try {
transitionCmd.loadState();
transitionCmd.verifyPrecondition();
fail();
}
catch (PreconditionException e) {
// Materialization should not happen as done materialization is set to true by coord kill
}
StatusTransitService.StatusTransitRunnable statusTransit = new StatusTransitService.StatusTransitRunnable();
statusTransit.run();
// StatusTransitService should change the job back to KILLED
job = jpaService.execute(coordJobGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.KILLED);
assertTrue(job.isDoneMaterialization());
assertNotNull(job.getLastModifiedTime());
}
/**
* Test : kill job and action (RUNNING) successfully
*
* @throws Exception
*/
public void testCoordKillSuccess2() throws Exception {
String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, true, 0);
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING,
"coord-action-get.xml", 0);
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action.getId());
job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.RUNNING);
assertEquals(action.getStatus(), CoordinatorAction.Status.RUNNING);
new CoordKillXCommand(job.getId()).call();
job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.KILLED);
assertNotNull(job.getLastModifiedTime());
assertEquals(action.getStatus(), CoordinatorAction.Status.KILLED);
}
/**
* Test : kill job successfully but failed to kill an already successful action
*
* @throws Exception
*/
public void testCoordKillFailedOnAction() throws Exception {
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, true);
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", 0);
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action.getId());
job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.SUCCEEDED);
assertEquals(action.getStatus(), CoordinatorAction.Status.SUCCEEDED);
new CoordKillXCommand(job.getId()).call();
job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.SUCCEEDED);
assertEquals(action.getStatus(), CoordinatorAction.Status.SUCCEEDED);
}
/**
* Test : kill SUCCEEDED job successfully when CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS is true and coordinator schema
* is 0.1
*
* @throws Exception
*/
public void testCoordKillForBackwardSupport() throws Exception {
Services.get().destroy();
setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, "true");
services = new Services();
setClassesToBeExcluded(services.getConf(), excludedServices);
services.init();
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, true);
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING,
"coord-action-get.xml", 0);
job.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, job);
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action.getId());
job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(CoordinatorJob.Status.SUCCEEDED, job.getStatus());
assertEquals(CoordinatorAction.Status.RUNNING, action.getStatus());
new CoordKillXCommand(job.getId()).call();
job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(CoordinatorJob.Status.KILLED, job.getStatus());
assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
}
/**
* Test : kill job failed. Job does not exist.
*
* @throws Exception
*/
public void testCoordKillFailed() throws Exception {
final String testJobId = "0000001-" + new Date().getTime() + "-testCoordKill-C";
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, true);
CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.READY,
"coord-action-get.xml", 0);
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(job.getId());
CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(action.getId());
job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.SUCCEEDED);
assertEquals(action.getStatus(), CoordinatorAction.Status.READY);
try {
new CoordKillXCommand(testJobId).call();
fail("Job doesn't exist. Should fail.");
} catch (CommandException ce) {
//Job doesn't exist. Exception is expected.
}
}
/**
* Test: Kill a waiting coord action
* @throws Exception
*/
public void testCoordKillWaiting() throws Exception {
String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, true, 0);
// Create a workflow job with RUNNING status
WorkflowJobBean wfJob1 = this
.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
// Create a coordinator job with RUNNING status
CoordinatorActionBean action1 = addRecordToCoordActionTable(coordJob.getId(), 1,
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1.getId(), "RUNNING", 0);
// Create a coordinator job with WAITING status
CoordinatorActionBean action2 = addRecordToCoordActionTable(coordJob.getId(), 2,
CoordinatorAction.Status.WAITING, "coord-action-get.xml", null, null, 0);
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
CoordActionGetJPAExecutor coordActionGetCmd1 = new CoordActionGetJPAExecutor(action1.getId());
CoordActionGetJPAExecutor coordActionGetCmd2 = new CoordActionGetJPAExecutor(action2.getId());
coordJob = jpaService.execute(coordJobGetCmd);
action1 = jpaService.execute(coordActionGetCmd1);
action2 = jpaService.execute(coordActionGetCmd2);
// Make sure the status is updated
assertEquals(coordJob.getStatus(), CoordinatorJob.Status.RUNNING);
assertEquals(action1.getStatus(), CoordinatorAction.Status.RUNNING);
assertEquals(action2.getStatus(), CoordinatorAction.Status.WAITING);
// Issue the kill command
new CoordKillXCommand(coordJob.getId()).call();
coordJob = jpaService.execute(coordJobGetCmd);
action1 = jpaService.execute(coordActionGetCmd1);
action2 = jpaService.execute(coordActionGetCmd2);
// Check the status and pending flag after kill command is issued
assertEquals(coordJob.getStatus(), CoordinatorJob.Status.KILLED);
assertEquals(action1.getStatus(), CoordinatorAction.Status.KILLED);
// The wf job is running and can be killed, so pending for coord action
// kill should be true
assertEquals(action1.getPending(), 1);
assertEquals(action2.getStatus(), CoordinatorAction.Status.KILLED);
// The coord job is waiting and no wf created yet, so pending for coord
// action kill should be false
assertEquals(action2.getPending(), 0);
}
public class MyCoordKillXCommand extends CoordKillXCommand {
long executed = 0;
int wait;
public MyCoordKillXCommand(String jobId, int wait) {
super(jobId);
this.wait = wait;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Type:").append(getType());
sb.append(",Priority:").append(getPriority());
return sb.toString();
}
@Override
protected Void execute() throws CommandException {
super.execute();
try {
Thread.sleep(wait);
}
catch (InterruptedException e) {
}
executed = System.currentTimeMillis();
return null;
}
}
public void testCoordKillXCommandUniqueness() throws Exception {
String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 0);
final MyCoordKillXCommand callable1 = new MyCoordKillXCommand(job.getId(), 100);
final MyCoordKillXCommand callable2 = new MyCoordKillXCommand(job.getId(), 100);
final MyCoordKillXCommand callable3 = new MyCoordKillXCommand(job.getId(), 100);
List<MyCoordKillXCommand> callables = Arrays.asList(callable1, callable2, callable3);
CallableQueueService queueservice = services.get(CallableQueueService.class);
for (MyCoordKillXCommand c : callables) {
queueservice.queue(c);
}
waitFor(1000, new Predicate() {
public boolean evaluate() throws Exception {
return callable1.executed != 0 && callable2.executed == 0 && callable3.executed == 0;
}
});
assertTrue(callable1.executed != 0);
assertTrue(callable2.executed == 0);
assertTrue(callable3.executed == 0);
}
public void testCoordKillRemovePushMissingDeps() throws Exception {
try {
services.destroy();
services = super.setupServicesForHCatalog();
services.init();
String db = "default";
String table = "tablename";
String server = "hcatserver";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
String pushMissingDeps = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
CoordinatorJob.Status.RUNNING, false, true);
CoordinatorActionBean action1 = addRecordToCoordActionTableForWaiting(job.getId(), 1,
CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", null, pushMissingDeps,
"Z");
String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=russia";
CoordinatorActionBean action2 = addRecordToCoordActionTableForWaiting(job.getId(), 2,
CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", null,
newHCatDependency3, "Z");
HCatURI hcatURI1, hcatURI2, hcatURI3;
hcatURI1 = new HCatURI(newHCatDependency1);
hcatURI2 = new HCatURI(newHCatDependency2);
hcatURI3 = new HCatURI(newHCatDependency3);
pdms.addMissingDependency(hcatURI1, action1.getId());
pdms.addMissingDependency(hcatURI2, action1.getId());
pdms.addMissingDependency(hcatURI3, action2.getId());
assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(action1.getId()));
assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency2)).contains(action1.getId()));
assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency3)).contains(action2.getId()));
new CoordKillXCommand(job.getId()).call();
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency2)));
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency3)));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}