| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.command.coord; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.io.PrintWriter; |
| import java.io.Reader; |
| import java.io.Writer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.oozie.CoordinatorActionBean; |
| import org.apache.oozie.CoordinatorJobBean; |
| import org.apache.oozie.ErrorCode; |
| import org.apache.oozie.WorkflowActionBean; |
| import org.apache.oozie.WorkflowJobBean; |
| import org.apache.oozie.action.hadoop.MapperReducerForTest; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.CoordinatorJob; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.CoordinatorAction.Status; |
| import org.apache.oozie.command.CommandException; |
| import org.apache.oozie.executor.jpa.CoordActionGetForStartJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.JPAExecutorException; |
| import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; |
| import org.apache.oozie.service.JPAService; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.test.XDataTestCase; |
| import org.apache.oozie.util.DateUtils; |
| import org.apache.oozie.util.IOUtils; |
| import org.apache.oozie.util.XConfiguration; |
| |
| public class TestCoordActionStartXCommand extends XDataTestCase { |
| private Services services; |
| |
| @Override |
| protected void setUp() throws Exception { |
| super.setUp(); |
| services = new Services(); |
| services.init(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| services.destroy(); |
| super.tearDown(); |
| } |
| |
| /** |
| * Test the working of CoordActionStartXCommand with standard coord action |
| * XML and then check the action |
| * |
| * @throws IOException |
| * @throws JPAExecutorException |
| * @throws CommandException |
| */ |
| public void testActionStartCommand() throws IOException, JPAExecutorException, CommandException { |
| String actionId = new Date().getTime() + "-COORD-ActionStartCommand-C@1"; |
| addRecordToActionTable(actionId, 1, null); |
| new CoordActionStartXCommand(actionId, "test", "myapp", "myjob").call(); |
| checkCoordAction(actionId); |
| } |
| |
| /** |
| * Coord action XML contains non-supported parameterized action name and |
| * test that CoordActionStartXCommand stores error code and error message in |
| * action's table during error handling |
| * |
| * @throws IOException |
| * @throws JPAExecutorException |
| * @throws CommandException |
| */ |
| public void testActionStartWithErrorReported() throws IOException, JPAExecutorException, CommandException { |
| String actionId = new Date().getTime() + "-COORD-ActionStartCommand-C@1"; |
| String wfApp = "<start to='${someParam}' />"; |
| addRecordToActionTable(actionId, 1, wfApp); |
| new CoordActionStartXCommand(actionId, "test", "myapp", "myjob").call(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| CoordinatorActionBean action = jpaService.execute(new CoordActionGetForStartJPAExecutor(actionId)); |
| if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) { |
| fail("Expected status was FAILED due to incorrect XML element"); |
| } |
| assertEquals(action.getErrorCode(), ErrorCode.E0701.toString()); |
| assertTrue(action.getErrorMessage().contains( |
| "XML schema error, cvc-pattern-valid: Value '${someParam}' " |
| + "is not facet-valid with respect to pattern")); |
| } |
| |
| /** |
| * Coord action XML contains disallowed property change and |
| * test that CoordActionStartXCommand stores error code and error message in |
| * action's table during error handling |
| * |
| * @throws IOException |
| * @throws JPAExecutorException |
| * @throws CommandException |
| */ |
| public void testActionStartWithError1003Reported() throws IOException, JPAExecutorException, CommandException { |
| String actionId = new Date().getTime() + "-COORD-ActionStartCommand-C@1"; |
| String wfApp = "<start to='${someParam}' />"; |
| Map<String, String> additionalProperties = new HashMap<>(); |
| additionalProperties.put("user.name", "admin"); |
| addRecordToActionTable(actionId, 1, wfApp, additionalProperties); |
| new CoordActionStartXCommand(actionId, "test", "myapp", "myjob").call(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| CoordinatorActionBean action = jpaService.execute(new CoordActionGetForStartJPAExecutor(actionId)); |
| assertEquals("Expected status was FAILED due to E1003 error code", CoordinatorAction.Status.FAILED, action.getStatus()); |
| assertEquals(action.getErrorCode(), ErrorCode.E1003.toString()); |
| assertTrue(action.getErrorMessage().contains("Invalid coordinator application attributes")); |
| } |
| |
| /** |
| * Test : configuration contains url string which should be escaped before put into the evaluator. |
| * If not escape, the error 'SAXParseException' will be thrown and workflow job will not be submitted. |
| * |
| * @throws Exception |
| */ |
| public void testActionStartWithEscapeStrings() throws Exception { |
| Date start = DateUtils.parseDateOozieTZ("2009-12-15T01:00Z"); |
| Date end = DateUtils.parseDateOozieTZ("2009-12-16T01:00Z"); |
| CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, |
| false, 1); |
| |
| CoordinatorActionBean action = addRecordToCoordActionTable(coordJob.getId(), 1, |
| CoordinatorAction.Status.SUBMITTED, "coord-action-start-escape-strings.xml", 0); |
| |
| String actionId = action.getId(); |
| new CoordActionStartXCommand(actionId, getTestUser(), "myapp", "myjob").call(); |
| |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); |
| |
| if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) { |
| fail("CoordActionStartCommand didn't work because the status for action id" + actionId + " is :" |
| + action.getStatus() + " expected to be NOT SUBMITTED (i.e. RUNNING)"); |
| } |
| |
| final String wfId = action.getExternalId(); |
| |
| waitFor(3000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| List<WorkflowActionBean> wfActions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(wfId)); |
| return wfActions.size() > 0; |
| } |
| }); |
| List<WorkflowActionBean> wfActions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(wfId)); |
| assertTrue(wfActions.size() > 0); |
| |
| final String wfActionId = wfActions.get(0).getId(); |
| waitFor(20 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(wfActionId)); |
| return wfAction.getExternalId() != null; |
| } |
| }); |
| |
| WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(wfActionId)); |
| assertNotNull(wfAction.getExternalId()); |
| } |
| |
| @Override |
| protected Configuration getCoordConf(Path coordAppPath) throws IOException { |
| Path wfAppPath = new Path(getFsTestCaseDir(), "app"); |
| FileSystem fs = getFileSystem(); |
| fs.mkdirs(new Path(wfAppPath, "lib")); |
| File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", MapperReducerForTest.class); |
| InputStream is = new FileInputStream(jarFile); |
| OutputStream os = fs.create(new Path(wfAppPath, "lib/test.jar")); |
| IOUtils.copyStream(is, os); |
| Path input = new Path(wfAppPath, "input"); |
| fs.mkdirs(input); |
| Writer writer = new OutputStreamWriter(fs.create(new Path(input, "test.txt")), |
| StandardCharsets.UTF_8); |
| writer.write("hello"); |
| writer.close(); |
| |
| final String APP1 = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='app'>" + |
| "<start to='end'/>" + |
| "<end name='end'/>" + |
| "</workflow-app>"; |
| String subWorkflowAppPath = new Path(wfAppPath, "subwf").toString(); |
| fs.mkdirs(new Path(wfAppPath, "subwf")); |
| Writer writer2 = new OutputStreamWriter(fs.create(new Path(subWorkflowAppPath, "workflow.xml")), |
| StandardCharsets.UTF_8); |
| writer2.write(APP1); |
| writer2.close(); |
| |
| Reader reader = IOUtils.getResourceAsReader("wf-url-template.xml", -1); |
| Writer writer1 = new OutputStreamWriter(fs.create(new Path(wfAppPath, "workflow.xml")), |
| StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader, writer1); |
| |
| Properties jobConf = new Properties(); |
| jobConf.setProperty(OozieClient.COORDINATOR_APP_PATH, coordAppPath.toString()); |
| jobConf.setProperty(OozieClient.USER_NAME, getTestUser()); |
| jobConf.setProperty(OozieClient.GROUP_NAME, getTestGroup()); |
| jobConf.setProperty("myJobTracker", getJobTrackerUri()); |
| jobConf.setProperty("myNameNode", getNameNodeUri()); |
| jobConf.setProperty("wfAppPath", new Path(wfAppPath, "workflow.xml").toString()); |
| jobConf.setProperty("mrclass", MapperReducerForTest.class.getName()); |
| jobConf.setProperty("delPath", new Path(wfAppPath, "output").toString()); |
| jobConf.setProperty("subWfApp", new Path(wfAppPath, "subwf/workflow.xml").toString()); |
| |
| |
| return new XConfiguration(jobConf); |
| } |
| |
| private void addRecordToActionTable(String actionId, int actionNum, String wfParam) |
| throws IOException, JPAExecutorException { |
| addRecordToActionTable(actionId, actionNum, wfParam, null); |
| } |
| |
| private void addRecordToActionTable(String actionId, int actionNum, String wfParam, Map<String, String> additionalProperties) |
| throws IOException, JPAExecutorException { |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| CoordinatorActionBean action = new CoordinatorActionBean(); |
| action.setJobId(actionId); |
| action.setId(actionId); |
| action.setActionNumber(actionNum); |
| action.setNominalTime(new Date()); |
| action.setStatus(Status.SUBMITTED); |
| File appPath = new File(getTestCaseDir(), "coord/no-op/"); |
| String actionXml = "<coordinator-app xmlns='uri:oozie:coordinator:0.2' xmlns:sla='uri:oozie:sla:0.1' name='NAME' " + |
| "frequency=\"1\" start='2009-02-01T01:00Z' end='2009-02-03T23:59Z' timezone='UTC' freq_timeunit='DAY' " + |
| "end_of_duration='NONE' instance-number=\"1\" action-nominal-time=\"2009-02-01T01:00Z\">"; |
| actionXml += "<controls>"; |
| actionXml += "<timeout>10</timeout>"; |
| actionXml += "<concurrency>2</concurrency>"; |
| actionXml += "<execution>LIFO</execution>"; |
| actionXml += "</controls>"; |
| actionXml += "<input-events>"; |
| actionXml += "<data-in name='A' dataset='a'>"; |
| actionXml += "<dataset name='a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY'" |
| + " end_of_duration='NONE'>"; |
| actionXml += "<uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}") + "</uri-template>"; |
| actionXml += "</dataset>"; |
| actionXml += "<instance>${coord:latest(0)}</instance>"; |
| actionXml += "</data-in>"; |
| actionXml += "</input-events>"; |
| actionXml += "<output-events>"; |
| actionXml += "<data-out name='LOCAL_A' dataset='local_a'>"; |
| actionXml += "<dataset name='local_a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit=" |
| + "'DAY' end_of_duration='NONE'>"; |
| actionXml += "<uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}") + "</uri-template>"; |
| actionXml += "</dataset>"; |
| actionXml += "<instance>${coord:current(-1)}</instance>"; |
| actionXml += "</data-out>"; |
| actionXml += "</output-events>"; |
| actionXml += "<action>"; |
| actionXml += "<workflow>"; |
| actionXml += "<app-path>" + appPath.toURI() + "</app-path>"; |
| actionXml += "<configuration>"; |
| actionXml += "<property>"; |
| actionXml += "<name>inputA</name>"; |
| actionXml += "<value>" + getTestCaseFileUri("coord/US//2009/02/01") + "</value>"; |
| actionXml += "</property>"; |
| actionXml += "<property>"; |
| actionXml += "<name>inputB</name>"; |
| actionXml += "<value>" + getTestCaseFileUri("coord/US//2009/02/01") + "</value>"; |
| actionXml += "</property>"; |
| if (additionalProperties != null) { |
| for (Map.Entry<String, String> entry : additionalProperties.entrySet()) { |
| actionXml += "<property>"; |
| actionXml += "<name>" + entry.getKey() + "</name>"; |
| actionXml += "<value>" + entry.getValue() + "</value>"; |
| actionXml += "</property>"; |
| } |
| } |
| actionXml += "</configuration>"; |
| actionXml += "</workflow>"; |
| String slaXml = " <sla:info xmlns:sla='uri:oozie:sla:0.1'>" + " <sla:app-name>test-app</sla:app-name>" |
| + " <sla:nominal-time>2009-03-06T10:00Z</sla:nominal-time>" + " <sla:should-start>5</sla:should-start>" |
| + " <sla:should-end>120</sla:should-end>" |
| + " <sla:notification-msg>Notifying User for nominal time : 2009-03-06T10:00Z </sla:notification-msg>" |
| + " <sla:alert-contact>abc@example.com</sla:alert-contact>" |
| + " <sla:dev-contact>abc@example.com</sla:dev-contact>" |
| + " <sla:qa-contact>abc@example.com</sla:qa-contact>" + " <sla:se-contact>abc@example.com</sla:se-contact>" |
| + "</sla:info>"; |
| actionXml += slaXml; |
| actionXml += "</action>"; |
| actionXml += "</coordinator-app>"; |
| action.setActionXml(actionXml); |
| action.setSlaXml(slaXml); |
| |
| String createdConf = "<configuration> "; |
| createdConf += "<property> <name>execution_order</name> <value>LIFO</value> </property>"; |
| createdConf += "<property> <name>user.name</name> <value>" + getTestUser() + "</value> </property>"; |
| createdConf += "<property> <name>group.name</name> <value>other</value> </property>"; |
| createdConf += "<property> <name>app-path</name> " + "<value>" + appPath.toURI() + "/</value> </property>"; |
| createdConf += "<property> <name>jobTracker</name> "; |
| createdConf += "<value>localhost:9001</value></property>"; |
| createdConf += "<property> <name>nameNode</name> <value>hdfs://localhost:9000</value></property>"; |
| createdConf += "<property> <name>queueName</name> <value>default</value></property>"; |
| createdConf += "</configuration> "; |
| |
| action.setCreatedConf(createdConf); |
| jpaService.execute(new CoordActionInsertJPAExecutor(action)); |
| String content = "<workflow-app xmlns='uri:oozie:workflow:0.2' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>"; |
| if (wfParam != null) { |
| if (!wfParam.isEmpty()) { |
| content += wfParam; |
| } |
| } |
| else { |
| content += "<start to='end' />"; |
| } |
| String slaXml2 = " <sla:info>" |
| // + " <sla:client-id>axonite-blue</sla:client-id>" |
| + " <sla:app-name>test-app</sla:app-name>" + " <sla:nominal-time>2009-03-06T10:00Z</sla:nominal-time>" |
| + " <sla:should-start>5</sla:should-start>" + " <sla:should-end>${2 * HOURS}</sla:should-end>" |
| + " <sla:notification-msg>Notifying User for nominal time : 2009-03-06T10:00Z </sla:notification-msg>" |
| + " <sla:alert-contact>abc@example.com</sla:alert-contact>" |
| + " <sla:dev-contact>abc@example.com</sla:dev-contact>" |
| + " <sla:qa-contact>abc@example.com</sla:qa-contact>" + " <sla:se-contact>abc@example.com</sla:se-contact>" |
| + "</sla:info>"; |
| content += "<end name='end' />" + slaXml2 + "</workflow-app>"; |
| writeToFile(content, appPath.getAbsolutePath()); |
| } |
| |
| private void checkCoordAction(String actionId) { |
| try { |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); |
| if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) { |
| fail("CoordActionStartCommand didn't work because the status for action id" + actionId + " is :" |
| + action.getStatus() + " expected to be NOT SUBMITTED (i.e. RUNNING)"); |
| } |
| if (action.getExternalId() != null) { |
| WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId())); |
| assertEquals(wfJob.getParentId(), action.getId()); |
| } |
| } |
| catch (JPAExecutorException je) { |
| fail("Action ID " + actionId + " was not stored properly in db"); |
| } |
| } |
| |
| private void writeToFile(String content, String appPath) throws IOException { |
| createDir(appPath); |
| File wf = new File(appPath, "workflow.xml"); |
| PrintWriter out = null; |
| try { |
| out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(wf), StandardCharsets.UTF_8)); |
| out.println(content); |
| } |
| catch (IOException iex) { |
| iex.printStackTrace(); |
| throw iex; |
| } |
| finally { |
| if (out != null) { |
| out.close(); |
| } |
| } |
| |
| } |
| |
| private void createDir(String dir) { |
| new File(dir, "_SUCCESS").mkdirs(); |
| } |
| } |