| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.command.wf; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.OutputStreamWriter; |
| import java.io.Reader; |
| import java.io.Writer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.oozie.CoordinatorActionBean; |
| import org.apache.oozie.CoordinatorJobBean; |
| import org.apache.oozie.DagEngine; |
| import org.apache.oozie.ForTestingActionExecutor; |
| import org.apache.oozie.WorkflowActionBean; |
| import org.apache.oozie.WorkflowJobBean; |
| import org.apache.oozie.action.control.KillActionExecutor; |
| import org.apache.oozie.client.CoordinatorJob; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.WorkflowAction; |
| import org.apache.oozie.client.WorkflowJob; |
| import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; |
| import org.apache.oozie.service.ActionService; |
| import org.apache.oozie.service.JPAService; |
| import org.apache.oozie.service.LiteWorkflowStoreService; |
| import org.apache.oozie.service.SchemaService; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.service.WorkflowStoreService; |
| import org.apache.oozie.store.WorkflowStore; |
| import org.apache.oozie.test.XDataTestCase; |
| import org.apache.oozie.util.IOUtils; |
| import org.apache.oozie.util.XConfiguration; |
| import org.apache.oozie.workflow.WorkflowInstance; |
| |
| /** |
| * Test cases for checking correct functionality in case of errors while executing Actions. |
| */ |
| public class TestActionErrors extends XDataTestCase { |
| |
| private Services services; |
| |
| @Override |
| protected void setUp() throws Exception { |
| super.setUp(); |
| setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd"); |
| setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR); |
| services = new Services(); |
| services.init(); |
| services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| services.destroy(); |
| super.tearDown(); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#NON_TRANSIENT} |
| * error is generated while attempting to start an action. </p> It first generates a {@link |
| * org.apache.oozie.action.ActionExecutorException.ErrorType#NON_TRANSIENT} error and checks for the job to go into |
| * {@link org.apache.oozie.client.WorkflowJob.Status#SUSPENDED} state. The state of the single action in the job is |
| * checked to be at {@link org.apache.oozie.WorkflowActionBean.Status#START_MANUAL} and it's error code and error |
| * message are verified. </p> The job is subsequently fixed to not generate any errors, and is resumed. The job |
| * state and the action state are verified to be {@link org.apache.oozie.client.WorkflowJob.Status#SUCCEEDED} and |
| * {@link org.apache.oozie.WorkflowActionBean.Status#OK} respectively. The action error code and error message are |
| * checked to be emtpy. |
| * |
| * @throws Exception |
| */ |
| public void testStartNonTransient() throws Exception { |
| _testNonTransient("start.non-transient", WorkflowActionBean.Status.START_MANUAL, "start"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#NON_TRANSIENT} |
| * error is generated while attempting to start an action. </p> It first generates a {@link |
| * org.apache.oozie.action.ActionExecutorException.ErrorType#NON_TRANSIENT} error and checks for the job to go into |
| * {@link org.apache.oozie.client.WorkflowJob.Status#SUSPENDED} state. Then checks for the coordinator action to go into |
| * {@link org.apache.oozie.client.CoordinatorAction.Status#SUSPENDED} state. |
| * |
| * @throws Exception |
| */ |
| public void testStartNonTransientWithCoordActionUpdate() throws Exception { |
| _testNonTransientWithCoordActionUpdate("start.non-transient", WorkflowActionBean.Status.START_MANUAL, "start"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#NON_TRANSIENT} |
| * error is generated while attempting to end an action. </p> It first generates a {@link |
| * org.apache.oozie.action.ActionExecutorException.ErrorType#NON_TRANSIENT} error and checks for the job to go into |
| * {@link org.apache.oozie.client.WorkflowJob.Status#SUSPENDED} state. The state of the single action in the job is |
| * checked to be at {@link org.apache.oozie.WorkflowActionBean.Status#END_MANUAL} and it's error code and error |
| * message are verified. </p> The job is subsequently fixed to not generate any errors, and is resumed. The job |
| * state and the action state are verified to be {@link org.apache.oozie.client.WorkflowJob.Status#SUCCEEDED} and |
| * {@link org.apache.oozie.WorkflowActionBean.Status#OK} respectively. The action error code and error message are |
| * checked to be emtpy. |
| * |
| * @throws Exception |
| */ |
| public void testEndNonTransient() throws Exception { |
| _testNonTransient("end.non-transient", WorkflowActionBean.Status.END_MANUAL, "end"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#NON_TRANSIENT} |
| * error is generated while attempting to end an action. </p> It first generates a {@link |
| * org.apache.oozie.action.ActionExecutorException.ErrorType#NON_TRANSIENT} error and checks for the job to go into |
| * {@link org.apache.oozie.client.WorkflowJob.Status#SUSPENDED} state. Then checks for the coordinator action to go into |
| * {@link org.apache.oozie.client.CoordinatorAction.Status#SUSPENDED} state. |
| * |
| * @throws Exception |
| */ |
| public void testEndNonTransientWithCoordActionUpdate() throws Exception { |
| _testNonTransientWithCoordActionUpdate("end.non-transient", WorkflowActionBean.Status.END_MANUAL, "end"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#TRANSIENT} |
| * error is generated when trying to start an action. </p> It first generates a {@link |
| * org.apache.oozie.action.ActionExecutorException.ErrorType#TRANSIENT} error. 2 retries with an interval of 10 |
| * seconds between them are allowed. The state of the action is checked after each attempt to be at {@link |
| * org.apache.oozie.WorkflowActionBean.Status#START_RETRY}. Error message and Error code for the action are |
| * verified. </p> After the configured number of retry attempts, the job and actions status are checked to be {@link |
| * org.apache.oozie.client.WorkflowJob.Status#SUSPENDED} and {@link org.apache.oozie.WorkflowActionBean.Status#END_MANUAL} |
| * respectively. The error message and code are verified again. |
| * |
| * @throws Exception |
| */ |
| public void testStartTransient() throws Exception { |
| _testTransient("start.transient", WorkflowActionBean.Status.START_RETRY, WorkflowActionBean.Status.START_MANUAL, "start"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#TRANSIENT} |
| * error is generated when trying to end an action. </p> It first generates a {@link |
| * org.apache.oozie.action.ActionExecutorException.ErrorType#TRANSIENT} error. 2 retries with an interval of 10 |
| * seconds between them are allowed. The state of the action is checked after each attempt to be at {@link |
| * org.apache.oozie.WorkflowActionBean.Status#END_RETRY}. Error message and Error code for the action are verified. |
| * </p> After the configured number of retry attempts, the job and actions status are checked to be {@link |
| * org.apache.oozie.client.WorkflowJob.Status#SUSPENDED} and {@link org.apache.oozie.WorkflowActionBean.Status#START_MANUAL} |
| * respectively. The error message and code are verified again. |
| * |
| * @throws Exception |
| */ |
| public void testEndTransient() throws Exception { |
| _testTransient("end.transient", WorkflowActionBean.Status.END_RETRY, WorkflowActionBean.Status.END_MANUAL, "end"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#ERROR} is |
| * generated when executing start. </p> Checks for the job to go into {@link org.apache.oozie.client.WorkflowJob.Status#KILLED} |
| * state. |
| * |
| * @throws Exception |
| */ |
| public void testStartError() throws Exception { |
| _testError("start.error", "error", "based_on_action_status"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#ERROR} is |
| * generated when executing end. </p> Checks for the job to go into {@link org.apache.oozie.client.WorkflowJob.Status#KILLED} |
| * state. |
| * |
| * @throws Exception |
| */ |
| public void testEndError() throws Exception { |
| _testError("end.error", "ok", "OK"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#ERROR} is |
| * generated when executing start. </p> Checks for user retry is applied to actions for specified retry-max=2. |
| * |
| * @throws Exception |
| */ |
| public void testStartErrorWithUserRetry() throws Exception { |
| _testErrorWithUserRetry("start.error", "error", "based_on_action_status"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#ERROR} is |
| * generated when executing end. </p> Checks for user retry is applied to actions for specified retry-max=2. |
| * |
| * @throws Exception |
| */ |
| public void testEndErrorWithUserRetry() throws Exception { |
| _testErrorWithUserRetry("end.error", "ok", "OK"); |
| assertTrue(true); |
| } |
| |
| /** |
| * Tests for the job to be KILLED and status set to FAILED in case an Action Handler does not call setExecutionData |
| * in it's start() implementation. |
| * |
| * @throws Exception |
| */ |
| public void testExecutionDataNotSet() throws Exception { |
| _testDataNotSet("avoid-set-execution-data", ActionStartXCommand.START_DATA_MISSING); |
| } |
| |
| /** |
| * Tests for the job to be KILLED and status set to FAILED in case an Action Handler does not call setEndData in |
| * it's end() implementation. |
| * |
| * @throws Exception |
| */ |
| public void testEndDataNotSet() throws Exception { |
| _testDataNotSet("avoid-set-end-data", ActionEndXCommand.END_DATA_MISSING); |
| } |
| |
| /** |
| * Provides functionality to test kill node message |
| * |
| * @throws Exception |
| */ |
| public void testKillNodeErrorMessage() throws Exception { |
| WorkflowActionBean killAction = _testKillNodeErrorMessage("wf-test-kill-node-message.xml"); |
| assertEquals("E0729", killAction.getErrorCode()); |
| assertEquals("[end]", killAction.getErrorMessage()); |
| assertEquals(WorkflowAction.Status.OK, killAction.getStatus()); |
| } |
| |
| /** |
| * Provides functionality to test kill node message when there's an error processing the message itself |
| * |
| * @throws Exception |
| */ |
| public void testKillNodeErrorMessageError() throws Exception { |
| WorkflowActionBean killAction = _testKillNodeErrorMessage("wf-test-kill-node-message-error.xml"); |
| assertEquals("E0756", killAction.getErrorCode()); |
| assertTrue("Incorrect error message", |
| killAction.getErrorMessage().startsWith("E0756: Exception parsing Kill node message")); |
| assertEquals(WorkflowAction.Status.ERROR, killAction.getStatus()); |
| } |
| |
| /** |
| * Provides functionality to test kill node message when there's an error processing the message itself |
| * |
| * @throws Exception |
| */ |
| public void testKillNodeErrorMessageError2() throws Exception { |
| WorkflowActionBean killAction = _testKillNodeErrorMessage("wf-test-kill-node-message-error2.xml"); |
| assertEquals("E0756", killAction.getErrorCode()); |
| assertEquals("E0756: Exception parsing Kill node message [variable [bar] cannot be resolved]", |
| killAction.getErrorMessage()); |
| assertEquals(WorkflowAction.Status.ERROR, killAction.getStatus()); |
| } |
| |
| private WorkflowActionBean _testKillNodeErrorMessage(String workflowXmlFile) throws Exception { |
| String workflowPath = getTestCaseFileUri("workflow.xml"); |
| Reader reader = IOUtils.getResourceAsReader(workflowXmlFile, -1); |
| Writer writer = new OutputStreamWriter(new FileOutputStream( |
| new File(getTestCaseDir(), "workflow.xml")), StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader, writer); |
| |
| final DagEngine engine = new DagEngine("u"); |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.APP_PATH, workflowPath); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| |
| conf.set(OozieClient.LOG_TOKEN, "t"); |
| conf.set("error", "end.error"); |
| conf.set("external-status", "FAILED/KILLED"); |
| conf.set("signal-value", "fail"); |
| |
| final String jobId = engine.submitJob(conf, true); |
| |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(jobId); |
| |
| waitFor(50000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| WorkflowJobBean job = jpaService.execute(wfJobGetCmd); |
| return (job.getWorkflowInstance().getStatus() == WorkflowInstance.Status.KILLED); |
| } |
| }); |
| |
| WorkflowJobBean job = jpaService.execute(wfJobGetCmd); |
| assertEquals(WorkflowJob.Status.KILLED, job.getStatus()); |
| |
| WorkflowActionsGetForJobJPAExecutor wfActionsGetCmd = new WorkflowActionsGetForJobJPAExecutor(jobId); |
| List<WorkflowActionBean> actions = jpaService.execute(wfActionsGetCmd); |
| |
| WorkflowActionBean action = null; |
| WorkflowActionBean killAction = null; |
| for (WorkflowActionBean bean : actions) { |
| if (bean.getType().equals("test")) { |
| action = bean; |
| } |
| if (bean.getType().equals(KillActionExecutor.TYPE)) { |
| killAction = bean; |
| } |
| } |
| assertNotNull(action); |
| assertEquals("TEST_ERROR", action.getErrorCode()); |
| assertEquals("end", action.getErrorMessage()); |
| assertEquals(WorkflowAction.Status.ERROR, action.getStatus()); |
| assertNotNull(killAction); |
| return killAction; |
| } |
| |
| /** |
| * Provides functionality to test non transient failures. |
| * |
| * @param errorType the error type. (start.non-transient, end.non-transient) |
| * @param expStatus1 expected status. (START_MANUAL, END_MANUAL) |
| * @param expErrorMsg expected error message. |
| * @throws Exception |
| */ |
| private void _testNonTransient(String errorType, WorkflowActionBean.Status expStatus1, String expErrorMsg) throws Exception { |
| String workflowPath = getTestCaseFileUri("workflow.xml"); |
| Reader reader = IOUtils.getResourceAsReader("wf-ext-schema-valid.xml", -1); |
| Writer writer = new OutputStreamWriter(new FileOutputStream( |
| new File(getTestCaseDir(), "workflow.xml")), StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader, writer); |
| |
| final DagEngine engine = new DagEngine("u"); |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.APP_PATH, workflowPath); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| |
| conf.set(OozieClient.LOG_TOKEN, "t"); |
| conf.set("signal-value", "OK"); |
| conf.set("external-status", "ok"); |
| conf.set("error", errorType); |
| |
| final String jobId = engine.submitJob(conf, true); |
| |
| waitFor(5000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return (engine.getJob(jobId).getStatus() == WorkflowJob.Status.SUSPENDED); |
| } |
| }); |
| |
| final WorkflowStore store = Services.get().get(WorkflowStoreService.class).create(); |
| store.beginTrx(); |
| List<WorkflowActionBean> actions = store.getActionsForWorkflow(jobId, true); |
| int n = actions.size(); |
| WorkflowActionBean action = actions.get(n - 1); |
| assertEquals("TEST_ERROR", action.getErrorCode()); |
| assertEquals(expErrorMsg, action.getErrorMessage()); |
| assertEquals(expStatus1, action.getStatus()); |
| assertTrue(action.isPending() == false); |
| |
| assertTrue(engine.getJob(jobId).getStatus() == WorkflowJob.Status.SUSPENDED); |
| |
| String actionConf = action.getConf(); |
| String fixedActionConf = actionConf.replaceAll(errorType, "none"); |
| action.setConf(fixedActionConf); |
| store.updateAction(action); |
| store.commitTrx(); |
| store.closeTrx(); |
| |
| engine.resume(jobId); |
| |
| waitFor(5000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return (engine.getJob(jobId).getStatus() == WorkflowJob.Status.SUCCEEDED); |
| } |
| }); |
| |
| assertEquals(WorkflowJob.Status.SUCCEEDED, engine.getJob(jobId).getStatus()); |
| |
| final WorkflowStore store2 = Services.get().get(WorkflowStoreService.class).create(); |
| store2.beginTrx(); |
| actions = store2.getActionsForWorkflow(jobId, false); |
| action = actions.get(0); |
| assertEquals(null, action.getErrorCode()); |
| assertEquals(null, action.getErrorMessage()); |
| assertEquals(WorkflowActionBean.Status.OK, action.getStatus()); |
| store2.commitTrx(); |
| store2.closeTrx(); |
| } |
| |
| /** |
| * Provides functionality to test non transient failures and coordinator action update |
| * |
| * @param errorType the error type. (start.non-transient, end.non-transient) |
| * @param expStatus1 expected status. (START_MANUAL, END_MANUAL) |
| * @param expErrorMsg expected error message. |
| * @throws Exception |
| */ |
| private void _testNonTransientWithCoordActionUpdate( |
| String errorType, WorkflowActionBean.Status expStatus1, String expErrorMsg) throws Exception { |
| String workflowPath = getTestCaseFileUri("workflow.xml"); |
| Reader reader = IOUtils.getResourceAsReader("wf-ext-schema-valid.xml", -1); |
| Writer writer = new OutputStreamWriter(new FileOutputStream( |
| new File(getTestCaseDir(), "workflow.xml")), StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader, writer); |
| |
| final DagEngine engine = new DagEngine("u"); |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.APP_PATH, workflowPath); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| |
| conf.set(OozieClient.LOG_TOKEN, "t"); |
| conf.set("signal-value", "OK"); |
| conf.set("external-status", "ok"); |
| conf.set("error", errorType); |
| |
| final String jobId = engine.submitJob(conf, false); |
| |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| final CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); |
| CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", jobId, "RUNNING", 0); |
| |
| engine.start(jobId); |
| |
| waitFor(5000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return (engine.getJob(jobId).getStatus() == WorkflowJob.Status.SUSPENDED); |
| } |
| }); |
| |
| assertNotNull(jpaService); |
| WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(jobId); |
| WorkflowJobBean job = jpaService.execute(wfGetCmd); |
| WorkflowActionsGetForJobJPAExecutor actionsGetExe = new WorkflowActionsGetForJobJPAExecutor(jobId); |
| List<WorkflowActionBean> actionsList = jpaService.execute(actionsGetExe); |
| |
| int n = actionsList.size(); |
| WorkflowActionBean action = actionsList.get(n - 1); |
| assertEquals("TEST_ERROR", action.getErrorCode()); |
| assertEquals(expErrorMsg, action.getErrorMessage()); |
| assertEquals(expStatus1, action.getStatus()); |
| assertFalse(action.isPending()); |
| |
| assertEquals (WorkflowJob.Status.SUSPENDED, job.getStatus()); |
| |
| waitFor(5000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorActionBean coordAction2 = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(jobId)); |
| return coordAction2.getStatus().equals(CoordinatorAction.Status.SUSPENDED); |
| } |
| }); |
| |
| coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(jobId)); |
| assertEquals (CoordinatorAction.Status.SUSPENDED, coordAction.getStatus()); |
| } |
| |
| /** |
| * Provides functionality to test transient failures. |
| * |
| * @param errorType the error type. (start.transient, end.transient) |
| * @param expStatus1 expected status after the first step (START_RETRY, END_RETRY) |
| * @param expStatus2 expected status after the second step (START_MANUAL, END_MANUAL) |
| * @param expErrorMsg the expected error message. |
| * @throws Exception |
| */ |
| private void _testTransient(String errorType, WorkflowActionBean.Status expStatus1, |
| final WorkflowActionBean.Status expStatus2, String expErrorMsg) throws Exception { |
| String workflowPath = getTestCaseFileUri("workflow.xml"); |
| Reader reader = IOUtils.getResourceAsReader("wf-ext-schema-valid.xml", -1); |
| Writer writer = new OutputStreamWriter(new FileOutputStream( |
| new File(getTestCaseDir(), "workflow.xml")), StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader, writer); |
| |
| final int maxRetries = 2; |
| final int retryInterval = 10; |
| |
| final DagEngine engine = new DagEngine("u"); |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.APP_PATH, workflowPath); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| |
| conf.set(OozieClient.LOG_TOKEN, "t"); |
| conf.set("signal-value", "OK"); |
| conf.set("external-status", "ok"); |
| conf.set("error", errorType); |
| conf.setInt(OozieClient.ACTION_MAX_RETRIES, maxRetries); |
| conf.setInt(OozieClient.ACTION_RETRY_INTERVAL, retryInterval); |
| |
| final String jobId = engine.submitJob(conf, true); |
| |
| int retryCount = 1; |
| WorkflowActionBean.Status expectedStatus = expStatus1; |
| int expectedRetryCount = 2; |
| |
| Thread.sleep(20000); |
| String aId = null; |
| final WorkflowStore store = Services.get().get(WorkflowStoreService.class).create(); |
| store.beginTrx(); |
| while (retryCount <= maxRetries) { |
| List<WorkflowActionBean> actions = store.getActionsForWorkflow(jobId, false); |
| WorkflowActionBean action = null; |
| for (WorkflowActionBean bean : actions) { |
| if (bean.getType().equals("test")) { |
| action = bean; |
| break; |
| } |
| } |
| assertNotNull(action); |
| aId = action.getId(); |
| assertEquals(expectedStatus, action.getStatus()); |
| assertEquals(expectedRetryCount, action.getRetries()); |
| assertEquals("TEST_ERROR", action.getErrorCode()); |
| assertEquals(expErrorMsg, action.getErrorMessage()); |
| if (action.getRetries() == maxRetries) { |
| expectedRetryCount = 0; |
| expectedStatus = expStatus2; |
| break; |
| } |
| else { |
| expectedRetryCount++; |
| } |
| Thread.sleep(retryInterval * 1000); |
| retryCount++; |
| } |
| store.commitTrx(); |
| store.closeTrx(); |
| Thread.sleep(5000); |
| |
| final String actionId = aId; |
| |
| waitFor(5000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return (engine.getWorkflowAction(actionId).getStatus() == expStatus2); |
| } |
| }); |
| |
| final WorkflowStore store2 = Services.get().get(WorkflowStoreService.class).create(); |
| store2.beginTrx(); |
| WorkflowActionBean action = engine.getWorkflowAction(actionId); |
| assertEquals("TEST_ERROR", action.getErrorCode()); |
| assertEquals(expErrorMsg, action.getErrorMessage()); |
| assertEquals(expStatus2, action.getStatus()); |
| assertTrue(action.isPending() == false); |
| assertEquals(WorkflowJob.Status.SUSPENDED, engine.getJob(jobId).getStatus()); |
| store2.commitTrx(); |
| store2.closeTrx(); |
| } |
| |
| /** |
| * Provides functionality to test errors |
| * |
| * @param errorType the error type. (start.non-transient, end.non-transient) |
| * @param externalStatus the external status to set. |
| * @param signalValue the signal value to set. |
| * @throws Exception |
| */ |
| private void _testError(String errorType, String externalStatus, String signalValue) throws Exception { |
| String workflowPath = getTestCaseFileUri("workflow.xml"); |
| Reader reader = IOUtils.getResourceAsReader("wf-ext-schema-valid.xml", -1); |
| Writer writer = new OutputStreamWriter(new FileOutputStream( |
| new File(getTestCaseDir(), "workflow.xml")), StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader, writer); |
| |
| final DagEngine engine = new DagEngine("u"); |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.APP_PATH, workflowPath); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| |
| conf.set(OozieClient.LOG_TOKEN, "t"); |
| conf.set("error", errorType); |
| conf.set("external-status", externalStatus); |
| conf.set("signal-value", signalValue); |
| |
| final String jobId = engine.submitJob(conf, true); |
| |
| final WorkflowStore store = Services.get().get(WorkflowStoreService.class).create(); |
| store.beginTrx(); |
| waitFor(5000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| WorkflowJobBean bean = store.getWorkflow(jobId, false); |
| return (bean.getWorkflowInstance().getStatus() == WorkflowInstance.Status.KILLED); |
| } |
| }); |
| assertEquals(WorkflowJob.Status.KILLED, engine.getJob(jobId).getStatus()); |
| store.commitTrx(); |
| store.closeTrx(); |
| } |
| |
| /** |
| * Provides functionality to test user retry |
| * |
| * @param errorType the error type. (start.non-transient, end.non-transient) |
| * @param externalStatus the external status to set. |
| * @param signalValue the signal value to set. |
| * @throws Exception |
| */ |
| private void _testErrorWithUserRetry(String errorType, String externalStatus, String signalValue) throws Exception { |
| String workflowPath = getTestCaseFileUri("workflow.xml"); |
| Reader reader = IOUtils.getResourceAsReader("wf-ext-schema-valid-user-retry.xml", -1); |
| Writer writer = new OutputStreamWriter(new FileOutputStream( |
| new File(getTestCaseDir(), "workflow.xml")), StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader, writer); |
| |
| final DagEngine engine = new DagEngine("u"); |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.APP_PATH, workflowPath); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| |
| conf.set(OozieClient.LOG_TOKEN, "t"); |
| conf.set("error", errorType); |
| conf.set("external-status", externalStatus); |
| conf.set("signal-value", signalValue); |
| |
| final String jobId = engine.submitJob(conf, true); |
| |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(jobId); |
| |
| final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId); |
| waitFor(5000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); |
| WorkflowActionBean action = null; |
| for (WorkflowActionBean bean : actions) { |
| if (bean.getType().equals("test")) { |
| action = bean; |
| break; |
| } |
| } |
| return (action != null && action.getUserRetryCount() == 2); |
| } |
| }); |
| |
| List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); |
| WorkflowActionBean action = null; |
| for (WorkflowActionBean bean : actions) { |
| if (bean.getType().equals("test")) { |
| action = bean; |
| break; |
| } |
| } |
| assertNotNull(action); |
| assertEquals(2, action.getUserRetryCount()); |
| } |
| |
| /** |
| * Provides functionality to test for set*Data calls not being made by the Action Handler. |
| * |
| * @param avoidParam set*Data function call to avoid. |
| * @param expActionErrorCode the expected action error code. |
| * @throws Exception |
| */ |
| private void _testDataNotSet(String avoidParam, String expActionErrorCode) throws Exception { |
| String workflowPath = getTestCaseFileUri("workflow.xml"); |
| Reader reader = IOUtils.getResourceAsReader("wf-ext-schema-valid.xml", -1); |
| Writer writer = new OutputStreamWriter(new FileOutputStream( |
| new File(getTestCaseDir(), "workflow.xml")), StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader, writer); |
| |
| final DagEngine engine = new DagEngine("u"); |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.APP_PATH, workflowPath); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| |
| conf.set(OozieClient.LOG_TOKEN, "t"); |
| conf.set("external-status", "ok"); |
| conf.set("signal-value", "based_on_action_status"); |
| conf.set(avoidParam, "true"); |
| |
| final String jobId = engine.submitJob(conf, true); |
| |
| final WorkflowStore store = Services.get().get(WorkflowStoreService.class).create(); |
| store.beginTrx(); |
| Thread.sleep(2000); |
| |
| waitFor(5000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| WorkflowJobBean bean = store.getWorkflow(jobId, false); |
| return (bean.getWorkflowInstance().getStatus() == WorkflowInstance.Status.FAILED); |
| } |
| }); |
| store.commitTrx(); |
| store.closeTrx(); |
| |
| final WorkflowStore store2 = Services.get().get(WorkflowStoreService.class).create(); |
| store2.beginTrx(); |
| assertEquals(WorkflowInstance.Status.FAILED, store2.getWorkflow(jobId, false).getWorkflowInstance().getStatus()); |
| assertEquals(WorkflowJob.Status.FAILED, engine.getJob(jobId).getStatus()); |
| |
| List<WorkflowActionBean> actions = store2.getActionsForWorkflow(jobId, false); |
| WorkflowActionBean action = null; |
| for (WorkflowActionBean bean : actions) { |
| if (bean.getType().equals("test")) { |
| action = bean; |
| break; |
| } |
| } |
| assertNotNull(action); |
| assertEquals(expActionErrorCode, action.getErrorCode()); |
| store2.commitTrx(); |
| store2.closeTrx(); |
| } |
| } |