blob: a12335f2d83c1fa9f34b809686d0eb4f762118f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.wf;
import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ForTestingActionExecutor;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.JsonUtils;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.ExtendedCallableQueueService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.XConfiguration;
public class TestWorkflowActionRetryInfoXCommand extends XDataTestCase {
private Services services;
@Override
protected void setUp() throws Exception {
super.setUp();
setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd");
setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR);
setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, ExtendedCallableQueueService.class.getName());
services = new Services();
services.init();
services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class);
}
@Override
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}
public void testRetryConsoleUrl() throws Exception {
//@formatter:off
String wfXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">"
+ "<start to=\"action1\"/>"
+"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">"
+ "<test xmlns=\"uri:test\">"
+ "<signal-value>${wf:conf('signal-value')}</signal-value>"
+ "<external-status>${wf:conf('external-status')}</external-status> "
+ "<external-childIds>${wf:conf('external-status')}</external-childIds> "
+ "<error>${wf:conf('error')}</error>"
+ "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
+ "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
+ "<running-mode>async-error</running-mode>"
+ "</test>"
+ "<ok to=\"end\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<kill name=\"kill\"><message>killed</message></kill>"
+ "<end name=\"end\"/>"
+ "</workflow-app>";
//@Formatter:on
validateRetryConsoleUrl(wfXml);
}
public void testRetryConsoleUrlForked() throws Exception {
//@formatter:off
String wfXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.3\" name=\"wf-fork\">"
+ "<start to=\"fork1\"/>"
+ "<fork name=\"fork1\">"
+ "<path start=\"action1\"/>"
+ "<path start=\"action2\"/>"
+ "</fork>"
+"<action name=\"action1\" retry-max=\"2\" retry-interval=\"0\">"
+ "<test xmlns=\"uri:test\">"
+ "<signal-value>${wf:conf('signal-value')}</signal-value>"
+ "<external-status>${wf:conf('external-status')}</external-status> "
+ "<external-childIds>${wf:conf('external-status')}</external-childIds> "
+ "<error>${wf:conf('error')}</error>"
+ "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
+ "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
+ "<running-mode>async-error</running-mode>"
+ "</test>"
+ "<ok to=\"join\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+"<action name=\"action2\" retry-max=\"2\" retry-interval=\"0\">"
+ "<test xmlns=\"uri:test\">"
+ "<signal-value>${wf:conf('signal-value')}</signal-value>"
+ "<external-status>${wf:conf('external-status')}</external-status> "
+ "<external-childIds>${wf:conf('external-status')}</external-childIds> "
+ "<error>${wf:conf('error')}</error>"
+ "<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>"
+ "<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>"
+ "<running-mode>async-error</running-mode>"
+ "</test>"
+ "<ok to=\"join\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<join name=\"join\" to=\"end\"/>"
+ "<kill name=\"kill\"><message>killed</message></kill>"
+ "<end name=\"end\"/>"
+ "</workflow-app>";
//@Formatter:on
validateRetryConsoleUrl(wfXml);
}
private void validateRetryConsoleUrl(String wfXml) throws Exception {
Configuration conf = new XConfiguration();
File workflowUri = new File(getTestCaseDir(), "workflow.xml");
writeToFile(wfXml, workflowUri);
conf.set(OozieClient.APP_PATH, workflowUri.toURI().toString());
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set("external-status", "error");
conf.set("signal-value", "based_on_action_status");
conf.set("external-childIds", "1");
SubmitXCommand sc = new SubmitXCommand(conf);
final String jobId = sc.call();
new StartXCommand(jobId).call();
final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
final JPAService jpaService = Services.get().get(JPAService.class);
waitFor(20 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
for (WorkflowActionBean bean : actions) {
if (bean.getType().equals("test") && bean.getUserRetryCount() == 2) {
return true;
}
}
return false;
}
});
List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
WorkflowActionBean action1 = null;
WorkflowActionBean action2 = null;
for (WorkflowActionBean bean : actions) {
if (bean.getType().equals("test") && bean.getName().equals("action1")) {
action1 = bean;
}
else if (bean.getType().equals("test") && bean.getName().equals("action2")) {
action2 = bean;
}
}
List<Map<String, String>> retries1List = getRetryList(action1);
int retries1Num = retries1List == null ? 0 : retries1List.size();
List<Map<String, String>> retries2List = getRetryList(action2);
int retries2Num = retries2List == null ? 0 : retries2List.size();
assertTrue( String.format("Invalid number of retries %d and %d", retries1Num, retries2Num),
retries1Num == 2 || retries2Num == 2);
if (retries1List != null) {
assertEquals("Invalid action attempt", retries1List.get(0).get(JsonTags.ACTION_ATTEMPT), "1");
assertEquals("Invalid workflow action start time", retries1List.get(0).get(JsonTags.WORKFLOW_ACTION_START_TIME),
JsonUtils.formatDateRfc822(action1.getStartTime()));
for (Map<String,String> retry : retries1List) {
assertNotNull("Workflow action console URL should not be null", retry.get(JsonTags.WORKFLOW_ACTION_CONSOLE_URL));
assertNotNull("Missing external child ids", retry.get(JsonTags.WORKFLOW_ACTION_EXTERNAL_CHILD_IDS));
}
if (retries1List.size() >= 2) {
final Date action1EndTime = action1.getEndTime();
final Date secondRetry1EndTime = JsonUtils.parseDateRfc822(retries1List.get(1).get(
JsonTags.WORKFLOW_ACTION_END_TIME));
assertTrue("action end time should be within ten seconds of second retry end time",
endTimeIsWithinExpectedTime(action1EndTime.toInstant(), secondRetry1EndTime.toInstant().plusSeconds(10)));
}
}
if (retries2List != null) {
assertEquals("Invalid action attempt", retries2List.get(0).get(JsonTags.ACTION_ATTEMPT), "1");
assertEquals("Invalid workflow action start time", retries2List.get(0).get(JsonTags.WORKFLOW_ACTION_START_TIME),
JsonUtils.formatDateRfc822(action2.getStartTime()));
if (retries2List.size() >= 2) {
final Date action2EndTime = action2.getEndTime();
final Date secondRetry2EndTime = JsonUtils.parseDateRfc822(retries2List.get(1).get(
JsonTags.WORKFLOW_ACTION_END_TIME));
assertTrue("action end time should be within ten seconds of second retry end time",
endTimeIsWithinExpectedTime(action2EndTime.toInstant(), secondRetry2EndTime.toInstant().plusSeconds(10)));
}
}
}
private List<Map<String, String>> getRetryList(WorkflowActionBean action) throws CommandException {
if (action == null) {
return null;
}
WorkflowActionRetryInfoXCommand command = new WorkflowActionRetryInfoXCommand(action.getId());
return command.call();
}
private boolean endTimeIsWithinExpectedTime(Instant endTime, Instant expectedTime) {
return ChronoUnit.MILLIS.between(endTime, expectedTime) >= 0;
}
}