blob: d640150e08c2c285f0d318cb57e0b0c60f042589 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.wf;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.apache.oozie.DagEngine;
import org.apache.oozie.ForTestingActionExecutor;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.ExtendedCallableQueueService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.service.RecoveryService;
public class TestSignalXCommand extends XDataTestCase {
private Services services;
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
services.init();
ConfigurationService.setBoolean(LiteWorkflowAppParser.VALIDATE_FORK_JOIN, false);
}
@Override
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}
public void testJoinFail() throws Exception{
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
_testJoinFail();
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
_testJoinFail();
}
public void testSuspendPoints() throws Exception{
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
_testSuspendPoints();
services.destroy();
services = new Services();
services.init();
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
_testSuspendPoints();
}
public void testSuspendPointsAll() throws Exception{
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
_testSuspendPointsAll();
services.destroy();
services = new Services();
services.init();
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, false);
_testSuspendPointsAll();
}
public void _testJoinFail() throws Exception {
Logger logger = Logger.getLogger(SignalXCommand.class);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Layout layout = new SimpleLayout();
Appender appender = new WriterAppender(layout, out);
logger.addAppender(appender);
FileSystem fs = getFileSystem();
Path appPath = new Path(getFsTestCaseDir(), "app");
fs.mkdirs(appPath);
Reader reader = IOUtils.getResourceAsReader("wf-fork.xml", -1);
Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml")),
StandardCharsets.UTF_8);
IOUtils.copyCharStream(reader, writer);
writer.close();
reader.close();
final DagEngine engine = new DagEngine("u");
XConfiguration conf = new XConfiguration();
conf.set(OozieClient.APP_PATH, new Path(appPath, "workflow.xml").toString());
conf.set(OozieClient.USER_NAME, getTestUser());
final String jobId = engine.submitJob(conf, false);
assertNotNull(jobId);
engine.start(jobId);
Thread.sleep(2000);
assertFalse(out.toString(StandardCharsets.UTF_8.name()).contains("EntityExistsException"));
}
public void _testSuspendPoints() throws Exception {
services.destroy();
LocalOozie.start();
FileSystem fs = getFileSystem();
Path appPath = new Path(getFsTestCaseDir(), "app");
fs.mkdirs(appPath);
Reader reader = IOUtils.getResourceAsReader("wf-suspendpoints.xml", -1);
Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml")),
StandardCharsets.UTF_8);
IOUtils.copyCharStream(reader, writer);
writer.close();
reader.close();
final OozieClient oc = LocalOozie.getClient();
Properties conf = oc.createConfiguration();
conf.setProperty(OozieClient.APP_PATH, new Path(appPath, "workflow.xml").toString());
conf.setProperty(OozieClient.USER_NAME, getTestUser());
conf.setProperty("oozie.suspend.on.nodes", "action1,nonexistant_action_name,decision1, action3,join1 ,fork1,action4b");
final String jobId = oc.submit(conf);
assertNotNull(jobId);
WorkflowJob wf = oc.getJobInfo(jobId);
assertEquals(WorkflowJob.Status.PREP, wf.getStatus());
long beforeStart = System.currentTimeMillis();
oc.start(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"action1"},
new String[]{":start:"});
// Check for creation time
long afterStart = System.currentTimeMillis();
WorkflowJob wf1 = oc.getJobInfo(jobId);
for (WorkflowAction action : wf1.getActions()) {
WorkflowActionBean bean = (WorkflowActionBean) action;
assertNotNull(bean.getCreatedTime());
assertTrue((bean.getCreatedTime().getTime() > beforeStart) && (bean.getCreatedTime().getTime() < afterStart));
}
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"decision1"},
new String[]{":start:", "action1", "action2"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"action3"},
new String[]{":start:", "action1", "action2", "decision1"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"fork1"},
new String[]{":start:", "action1", "action2", "decision1", "action3"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"action4a", "action4b", "action4c"},
new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"join1"},
new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUCCEEDED,
new String[]{},
new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c",
"join1", "end"});
LocalOozie.stop();
}
public void _testSuspendPointsAll() throws Exception {
services.destroy();
LocalOozie.start();
FileSystem fs = getFileSystem();
Path appPath = new Path(getFsTestCaseDir(), "app");
fs.mkdirs(appPath);
Reader reader = IOUtils.getResourceAsReader("wf-suspendpoints.xml", -1);
Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml")),
StandardCharsets.UTF_8);
IOUtils.copyCharStream(reader, writer);
writer.close();
reader.close();
final OozieClient oc = LocalOozie.getClient();
Properties conf = oc.createConfiguration();
conf.setProperty(OozieClient.APP_PATH, new Path(appPath, "workflow.xml").toString());
conf.setProperty(OozieClient.USER_NAME, getTestUser());
conf.setProperty("oozie.suspend.on.nodes", "*");
final String jobId = oc.submit(conf);
assertNotNull(jobId);
WorkflowJob wf = oc.getJobInfo(jobId);
assertEquals(WorkflowJob.Status.PREP, wf.getStatus());
oc.start(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"action1"},
new String[]{":start:"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"action2"},
new String[]{":start:", "action1"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"decision1"},
new String[]{":start:", "action1", "action2"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"action3"},
new String[]{":start:", "action1", "action2", "decision1"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"fork1"},
new String[]{":start:", "action1", "action2", "decision1", "action3"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"action4a", "action4b", "action4c"},
new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"join1"},
new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
new String[]{"end"},
new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c",
"join1"});
oc.resume(jobId);
checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUCCEEDED,
new String[]{},
new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c",
"join1", "end"});
LocalOozie.stop();
}
private void checkSuspendActions(WorkflowJob wf, final OozieClient oc, final String jobId, final WorkflowJob.Status status,
String[] prepActions, String[] okActions) throws Exception {
// Wait for the WF to transition to status
waitFor(30 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
WorkflowJob wf = oc.getJobInfo(jobId);
return wf.getStatus() == status;
}
});
wf = oc.getJobInfo(jobId);
assertEquals(status, wf.getStatus());
// Check the actions' statuses
int numPrep = 0;
int numOK = 0;
for (WorkflowAction action : wf.getActions()) {
boolean checked = false;
for (String name : prepActions) {
if (!checked && name.equals(action.getName())) {
assertEquals("action [" + action.getName() + "] had incorrect status",
WorkflowAction.Status.PREP, action.getStatus());
numPrep++;
checked = true;
}
}
if (!checked) {
for (String name : okActions) {
if (!checked && name.equals(action.getName())) {
assertEquals("action [" + action.getName() + "] had incorrect status",
WorkflowAction.Status.OK, action.getStatus());
numOK++;
checked = true;
}
}
}
if (!checked) {
fail("Unexpected action [" + action.getName() + "] with status [" + action.getStatus() + "]");
}
}
assertEquals(prepActions.length, numPrep);
assertEquals(okActions.length, numOK);
}
private void writeToFile(String appXml, String appPath) throws IOException {
File wf = new File(URI.create(appPath));
try(PrintWriter out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(wf), StandardCharsets.UTF_8))) {
out.println(appXml);
}
}
/*
* This test case is just to test possible dead-lock when
* the conf of oozie.workflow.parallel.fork.action.start
* is enabled.
*
* Details could be linked to OOZIE-3646
*/
public void testPossibleDeadLock() throws Exception {
setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, ExtendedCallableQueueService.class.getName());
services = new Services();
Configuration servicesConf = services.getConf();
servicesConf.setInt(CallableQueueService.CONF_THREADS, 1);
services.init();
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
Configuration conf = new XConfiguration();
String workflowUri = getTestCaseFileUri("workflow.xml");
//@formatter:off
String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"wf-fork\">"
+ "<start to=\"fork1\"/>"
+ "<fork name=\"fork1\">"
+ "<path start=\"action1\"/>"
+ "<path start=\"action2\"/>"
+ "<path start=\"action3\"/>"
+ "<path start=\"action4\"/>"
+ "<path start=\"action5\"/>"
+ "</fork>"
+ "<action name=\"action1\">"
+ "<fs></fs>"
+ "<ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action2\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action3\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action4\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action5\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<join name=\"join1\" to=\"end\"/>"
+ "<kill name=\"kill\"><message>killed</message>"
+ "</kill><"
+ "end name=\"end\"/>"
+ "</workflow-app>";
//@Formatter:on
writeToFile(appXml, workflowUri);
conf.set(OozieClient.APP_PATH, workflowUri);
conf.set(OozieClient.USER_NAME, getTestUser());
SubmitXCommand sc = new SubmitXCommand(conf);
final String jobId = sc.call();
new StartXCommand(jobId).call();
waitFor(20 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return WorkflowJobQueryExecutor.getInstance()
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
.getStatus() == WorkflowJob.Status.SUCCEEDED;
}
});
assertEquals(WorkflowJobQueryExecutor.getInstance()
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
.getStatus(),
WorkflowJob.Status.SUCCEEDED);
}
/**
* Test : fork parallel submit, one transition fail, and the job is failed but the other transition
* always RUNNING or PREP.
* verify the PreconditionException is thrown when action2 = RUNNING or PREP and job = FAIL
*
*/
public void testForkParallelSubmitFail() throws Exception {
_testForkSubmitRunFail(true);
}
/**
* Test : fork serial submit, one transition fail, and the job is failed but the other transition
* always RUNNING or PREP.
* verify the PreconditionException is thrown when action2 = RUNNING or PREP and job = FAIL
*
*/
public void testForkSerialSubmitFail() throws Exception {
_testForkSubmitRunFail(false);
}
/**
* Test : fork parallel submit, one transition fail, and the job is failed but the other transition
* always RUNNING or PREP.
* verify the PreconditionException is thrown when action2 = RUNNING or PREP and job = FAIL
*
* @param isForkParallelSubmit ("ture" or "fail")
*/
private void _testForkSubmitRunFail(boolean isForkParallelSubmit) throws Exception {
services.destroy();
setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd");
setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR);
services = new Services();
services.init();
services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class);
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, isForkParallelSubmit);
String workflowUri = getTestCaseFileUri("workflow.xml");
//@formatter:off
String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"wf-fork-submit\">\n" +
" <start to=\"fork1\"/>\n" +
" <fork name=\"fork1\">\n" +
" <path start=\"action_to_be_failed\"/>\n" +
" <path start=\"action_to_be_succeeded_or_killed\"/>\n" +
" </fork>\n" +
" <action name=\"action_to_be_failed\">\n" +
" <test xmlns=\"uri:test\">\n" +
" <signal-value>${wf:conf('signal-value')}</signal-value>\n" +
" <external-status>${wf:conf('external-status')}</external-status>\n" +
" <error>${wf:conf('error')}</error>\n" +
" <avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>\n" +
" <avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>\n" +
" <running-mode>${wf:conf('running-mode')}</running-mode>\n" +
" </test>\n" +
" <ok to=\"join1\"/>\n" +
" <error to=\"kill\"/>\n" +
" </action>\n" +
" <action name=\"action_to_be_succeeded_or_killed\">\n" +
" <test xmlns=\"uri:test\">\n" +
" <signal-value>based_on_action_status</signal-value>\n" +
" <external-status>ok</external-status>\n" +
" <error>ok</error>\n" +
" <avoid-set-execution-data>true</avoid-set-execution-data>\n" +
" <avoid-set-end-data>false</avoid-set-end-data>\n" +
" <running-mode>async</running-mode>\n" +
" </test>\n" +
" <ok to=\"join1\"/>\n" +
" <error to=\"kill\"/>\n" +
" </action>\n" +
" <join name=\"join1\" to=\"end\"/>\n" +
" <kill name=\"kill\">\n" +
" <message>killed</message>\n" +
" </kill>\n" +
" <end name=\"end\"/>\n" +
"</workflow-app>";
//@Formatter:on
writeToFile(appXml, workflowUri);
final DagEngine engine = new DagEngine("u");
Configuration conf = new Configuration();
conf.set(OozieClient.APP_PATH, workflowUri);
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set(OozieClient.LOG_TOKEN, "t");
conf.set("error", "start.fail");
conf.set("external-status", "error");
conf.set("signal-value", "based_on_action_status");
final String jobId = engine.submitJob(conf, true);
final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
final JPAService jpaService = Services.get().get(JPAService.class);
waitFor(30 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
return WorkflowJob.Status.FAILED.equals(engine.getJob(jobId).getStatus());
}
});
// wait for execute KillXCommand, and all actions has finished
waitFor(30 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
for (WorkflowActionBean action : actions) {
if (WorkflowAction.Status.PREP.equals(action.getStatus()) ||
WorkflowAction.Status.RUNNING.equals(action.getStatus()) ){
return false;
}
}
return true;
}
});
List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
assertEquals("action size [" + actions.size() + "] had incorrect", 4, actions.size());
for (WorkflowActionBean action : actions) {
if ("action_to_be_failed".equals(action.getName())){
assertEquals("action [" + action.getName() + "] had incorrect status",
WorkflowAction.Status.FAILED, action.getStatus());
}
if ("action_to_be_succeeded_or_killed".equals(action.getName())){
// 1.The "action_to_be_failed" action submit fail, and the "action_to_be_succeeded_or_killed" action
// has finished, so the "action_to_be_succeeded_or_killed" action should be OK
// 2.The "action_to_be_failed" action submit fail, and the "action_to_be_succeeded_or_killed" action is
// PREP or RUNNING, so the "action_to_be_succeeded_or_killed" action should be KILLED
if (!WorkflowAction.Status.KILLED.equals(action.getStatus()) &&
!WorkflowAction.Status.OK.equals(action.getStatus())) {
fail("Unexpected action [" + action.getName() + "] with status [" + action.getStatus() + "]");
}
}
}
}
/**
* for test {@link #testDeadlockForForkParallelSubmit()}
*/
public static class TestRecoverForkStartActionCallableQueueService extends CallableQueueService{
private final XLog log = XLog.getLog(getClass());
public static TestSignalXCommand testSignalXCommand;
/**
* Overwrite for test the same action's ActionStartXCommand in queue and the ForkedActionStartXCommand wouldn't lose,
* if ActionStartXCommand and ForkedActionStartXCommand has the same name, ForkedActionStartXCommand couldn't enqueue
* after a ActionStartXCommand in queue waiting for running.
*/
public class CallableWrapper<E> extends CallableQueueService.CallableWrapper<E> {
boolean forkedActionStartXCommandFirstEnter;
public CallableWrapper(XCallable<E> callable, long delay) {
super(callable,delay);
forkedActionStartXCommandFirstEnter = callable instanceof ForkedActionStartXCommand;
}
public void run() {
XCallable<?> callable = getElement();
if (forkedActionStartXCommandFirstEnter && callable instanceof ForkedActionStartXCommand){
// make sure there has a ActionStartXCommand in the queue wait to run,
// and then ForkedActionStartXCommand enqueue
testSignalXCommand.waitFor(15 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return !filterDuplicates();
}
},200);
log.warn("max concurrency for callable [{0}] exceeded, enqueueing with [{1}]ms delay", callable
.getType(), CONCURRENCY_DELAY);
setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
try {
Method queue = CallableQueueService.class.getDeclaredMethod("queue",
CallableQueueService.CallableWrapper.class, boolean.class);
queue.setAccessible(true);
queue.invoke(TestRecoverForkStartActionCallableQueueService.this,this,true);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
forkedActionStartXCommandFirstEnter = false;
}else {
super.run();
}
}
}
/**
* Replace CallableQueueService.CallableWrapper to TestRecoverForkStartActionCallableQueueService.CallableWrapper
* for text, in order to call TestRecoverForkStartActionCallableQueueService.CallableWrapper for wait
* ActionStartXCommand enqueue before.
*
*/
public <T> Future<T> submit(CallableQueueService.CallableWrapper<T> task) throws InterruptedException {
return super.submit(new TestRecoverForkStartActionCallableQueueService.CallableWrapper<T>(task.getElement(),
task.getInitialDelay()));
}
}
/**
* Test : fork parallel submit, the action has the same XCommand in queue, there will skip enqueue by
* {@link CallableQueueService.CallableWrapper#filterDuplicates()} so if the ActionStartXCommand and
* ForkedActionStartXCommand has the same name, it would be lost.
*
* Note : RecoveryService will check the pending action and try to start it. So if the action's ForkedActionStartXCommand
* wait for run, there may be a ActionStartXCommand add for the same action.
*
*/
public void testDeadlockForForkParallelSubmit() throws Exception {
setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, TestRecoverForkStartActionCallableQueueService.class.getName());
TestRecoverForkStartActionCallableQueueService.testSignalXCommand = this;
services = new Services();
Configuration servicesConf = services.getConf();
servicesConf.setInt(RecoveryService.CONF_WF_ACTIONS_OLDER_THAN, 0);
servicesConf.setInt(RecoveryService.CONF_SERVICE_INTERVAL, 10);
services.init();
ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
Configuration conf = new XConfiguration();
String workflowUri = getTestCaseFileUri("workflow.xml");
//@formatter:off
String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"wf-fork\">"
+ "<start to=\"fork1\"/>"
+ "<fork name=\"fork1\">"
+ "<path start=\"action1\"/>"
+ "<path start=\"action2\"/>"
+ "<path start=\"action3\"/>"
+ "<path start=\"action4\"/>"
+ "<path start=\"action5\"/>"
+ "</fork>"
+ "<action name=\"action1\">"
+ "<fs></fs>"
+ "<ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action2\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action3\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action4\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action5\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<join name=\"join1\" to=\"end\"/>"
+ "<kill name=\"kill\"><message>killed</message>"
+ "</kill><"
+ "end name=\"end\"/>"
+ "</workflow-app>";
//@Formatter:on
writeToFile(appXml, workflowUri);
conf.set(OozieClient.APP_PATH, workflowUri);
conf.set(OozieClient.USER_NAME, getTestUser());
SubmitXCommand sc = new SubmitXCommand(conf);
final String jobId = sc.call();
new StartXCommand(jobId).call();
waitFor(30 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return WorkflowJobQueryExecutor.getInstance()
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
.getStatus() == WorkflowJob.Status.SUCCEEDED;
}
});
assertEquals(WorkflowJobQueryExecutor.getInstance()
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
.getStatus(),
WorkflowJob.Status.SUCCEEDED);
}
}