| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.test; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.OutputStreamWriter; |
| import java.io.PrintWriter; |
| import java.io.Reader; |
| import java.io.UnsupportedEncodingException; |
| import java.io.Writer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Calendar; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.regex.Matcher; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.oozie.AppType; |
| import org.apache.oozie.BundleActionBean; |
| import org.apache.oozie.BundleJobBean; |
| import org.apache.oozie.CoordinatorActionBean; |
| import org.apache.oozie.CoordinatorJobBean; |
| import org.apache.oozie.SLAEventBean; |
| import org.apache.oozie.WorkflowActionBean; |
| import org.apache.oozie.WorkflowJobBean; |
| import org.apache.oozie.action.hadoop.MapperReducerForTest; |
| import org.apache.oozie.client.BundleJob; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.CoordinatorJob; |
| import org.apache.oozie.client.CoordinatorJob.Execution; |
| import org.apache.oozie.client.CoordinatorJob.Timeunit; |
| import org.apache.oozie.client.event.SLAEvent.EventStatus; |
| import org.apache.oozie.client.event.SLAEvent.SLAStatus; |
| import org.apache.oozie.client.Job; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.SLAEvent; |
| import org.apache.oozie.client.WorkflowAction; |
| import org.apache.oozie.client.WorkflowJob; |
| import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; |
| import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; |
| import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; |
| import org.apache.oozie.executor.jpa.JPAExecutorException; |
| import org.apache.oozie.executor.jpa.SLAEventInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; |
| import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; |
| import org.apache.oozie.service.CallableQueueService; |
| import org.apache.oozie.service.JPAService; |
| import org.apache.oozie.service.LiteWorkflowStoreService; |
| import org.apache.oozie.service.Service; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.service.UUIDService; |
| import org.apache.oozie.service.UUIDService.ApplicationType; |
| import org.apache.oozie.service.WorkflowAppService; |
| import org.apache.oozie.service.WorkflowStoreService; |
| import org.apache.oozie.sla.SLARegistrationBean; |
| import org.apache.oozie.sla.SLASummaryBean; |
| import org.apache.oozie.util.DateUtils; |
| import org.apache.oozie.util.IOUtils; |
| import org.apache.oozie.util.XCallable; |
| import org.apache.oozie.util.XConfiguration; |
| import org.apache.oozie.util.XLog; |
| import org.apache.oozie.util.XmlUtils; |
| import org.apache.oozie.workflow.WorkflowApp; |
| import org.apache.oozie.workflow.WorkflowInstance; |
| import org.apache.oozie.workflow.WorkflowLib; |
| import org.apache.oozie.workflow.lite.EndNodeDef; |
| import org.apache.oozie.workflow.lite.LiteWorkflowApp; |
| import org.apache.oozie.workflow.lite.LiteWorkflowInstance; |
| import org.apache.oozie.workflow.lite.StartNodeDef; |
| import org.jdom.Element; |
| import org.jdom.JDOMException; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| @SuppressWarnings("deprecation") |
| public abstract class XDataTestCase extends XHCatTestCase { |
| |
| protected static String slaXml = " <sla:info xmlns:sla='uri:oozie:sla:0.1'>" |
| + " <sla:app-name>test-app</sla:app-name>" + " <sla:nominal-time>2009-03-06T10:00Z</sla:nominal-time>" |
| + " <sla:should-start>5</sla:should-start>" + " <sla:should-end>120</sla:should-end>" |
| + " <sla:notification-msg>Notifying User for nominal time : 2009-03-06T10:00Z </sla:notification-msg>" |
| + " <sla:alert-contact>abc@example.com</sla:alert-contact>" |
| + " <sla:dev-contact>abc@example.com</sla:dev-contact>" |
| + " <sla:qa-contact>abc@example.com</sla:qa-contact>" + " <sla:se-contact>abc@example.com</sla:se-contact>" |
| + "</sla:info>"; |
| |
| protected String bundleName; |
| protected String bundleId; |
| protected String CREATE_TIME = "2012-07-22T00:00Z"; |
| |
| /** |
| * Class is used to change the queueservice, as that one meddles with the actions in the background. |
| */ |
| protected static class FakeCallableQueueService extends CallableQueueService implements Service { |
| @Override |
| public void init(Services services) { |
| } |
| |
| @Override |
| public void destroy() { |
| } |
| |
| @Override |
| public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) { |
| return false; |
| } |
| |
| @Override |
| public Set<String> getInterruptTypes() { |
| return Collections.emptySet(); |
| } |
| } |
| |
| public XDataTestCase() { |
| } |
| |
| /* |
| * The following 2 methods are "published" versions of |
| * #setUp() and #tearDown() respectively. Created to be able to |
| * use them in composition. |
| */ |
| @VisibleForTesting |
| public void setUpPub() throws Exception { |
| setUp(); |
| } |
| |
| @VisibleForTesting |
| public void tearDownPub() throws Exception { |
| tearDown(); |
| } |
| |
| protected int hoursToSeconds(final int hours) { |
| return new Long(java.util.concurrent.TimeUnit.HOURS.toSeconds(hours)).intValue(); |
| } |
| |
| |
| /** |
| * Inserts the passed coord job |
| * @param coord job bean |
| * @throws Exception |
| */ |
| protected void addRecordToCoordJobTable(CoordinatorJobBean coordJob) throws Exception { |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord job record to table"); |
| throw je; |
| } |
| } |
| |
| protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime, |
| Date pauseTime, String freq) throws Exception { |
| return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, -1, freq, Timeunit.MINUTE); |
| } |
| protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime, |
| Date pauseTime, String freq, Timeunit timeUnit) throws Exception { |
| return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, -1, freq, timeUnit); |
| } |
| |
| protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime, |
| Date pauseTime, String freq, int matThrottling) throws Exception { |
| return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, -1, freq, Timeunit.MINUTE, |
| CoordinatorJob.Execution.FIFO, matThrottling); |
| } |
| |
| protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime, |
| Date pauseTime, int timeout, String freq, Timeunit timeUnit) |
| throws Exception { |
| return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, timeout, freq, timeUnit, |
| CoordinatorJob.Execution.FIFO, 20); |
| } |
| |
| protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime, |
| Date pauseTime, int timeout, String freq, |
| CoordinatorJob.Execution execution) throws Exception { |
| return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, timeout, freq, Timeunit.MINUTE, execution, 20); |
| } |
| |
| /** |
| * Insert coord job for testing. |
| * |
| * @param status coord job status |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @return coord job bean |
| * @throws Exception |
| */ |
| protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, boolean pending, |
| boolean doneMatd) throws Exception { |
| CoordinatorJobBean coordJob = createCoordJob(status, pending, doneMatd); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord job record to table"); |
| throw je; |
| } |
| |
| return coordJob; |
| } |
| |
| protected CoordinatorJobBean addRecordToCoordJobTable(String appXml) throws Exception { |
| Date start = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z"); |
| Date end = DateUtils.parseDateOozieTZ("2009-02-03T23:59Z"); |
| appXml = appXml.replaceAll("#start", DateUtils.formatDateOozieTZ(start)); |
| appXml = appXml.replaceAll("#end", DateUtils.formatDateOozieTZ(end)); |
| Path appPath = new Path(getFsTestCaseDir(), "coord"); |
| writeToFile(appXml, appPath, "coordinator.xml"); |
| CoordinatorJobBean coordJob = createCoordBean(appPath, appXml, CoordinatorJob.Status.PREP, start, end, false, |
| false, 0); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord job record to table"); |
| throw je; |
| } |
| |
| return coordJob; |
| } |
| |
| /** |
| * Insert coord job for testing. |
| * |
| * @param status coord job status |
| * @param start start time |
| * @param end end time |
| * @param created Time |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @param lastActionNum last action number |
| * @return coord job bean |
| * @throws Exception |
| */ |
| protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date start, Date end, |
| boolean pending, boolean doneMatd, int lastActionNum) throws Exception { |
| return addRecordToCoordJobTable(status, start, end, new Date(), pending, doneMatd, lastActionNum); |
| } |
| |
| /** |
| * Insert coord job for testing. |
| * |
| * @param status coord job status |
| * @param start start time |
| * @param end end time |
| * @param created Time |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @param lastActionNum last action number |
| * @return coord job bean |
| * @throws Exception |
| */ |
| protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date start, Date end, |
| Date createdTime, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { |
| CoordinatorJobBean coordJob = createCoordJob(status, start, end, createdTime, pending, doneMatd, lastActionNum); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord job record to table"); |
| throw je; |
| } |
| |
| return coordJob; |
| |
| } |
| |
| /** |
| * Insert coord job for testing. |
| * |
| * @param testFileName test file name |
| * @param status coord job status |
| * @param start start time |
| * @param end end time |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @param lastActionNum last action number |
| * @return coord job bean |
| * @throws Exception |
| */ |
| protected CoordinatorJobBean addRecordToCoordJobTable(String testFileName, CoordinatorJob.Status status, |
| Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { |
| CoordinatorJobBean coordJob = createCoordJob(testFileName, status, start, end, pending, doneMatd, lastActionNum); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord job record to table"); |
| throw je; |
| } |
| |
| return coordJob; |
| |
| } |
| |
| /** |
| * Add coordinator job bean with bundle id info. |
| * |
| * @param bundleId bundle id |
| * @param coordId coord id and coord name |
| * @param status job status |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @param lastActionNumber last action number |
| * @return coordinator job bean |
| * @throws Exception |
| */ |
| protected CoordinatorJobBean addRecordToCoordJobTableWithBundle(String bundleId, String coordId, |
| CoordinatorJob.Status status, boolean pending, boolean doneMatd, int lastActionNumber) throws Exception { |
| CoordinatorJobBean coordJob = createCoordJob(status, pending, doneMatd); |
| coordJob.setBundleId(bundleId); |
| // coord id and coord name are the same |
| coordJob.setId(coordId); |
| coordJob.setAppName(coordId); |
| coordJob.setLastActionNumber(lastActionNumber); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord job record to table"); |
| throw je; |
| } |
| |
| return coordJob; |
| } |
| |
| /** |
| * Add coordinator job bean with bundle id info. |
| * |
| * @param bundleId bundle id |
| * @param coordId coord id and coord name |
| * @param status job status |
| * @param start start time |
| * @param end end time |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @param lastActionNumber last action number |
| * @return coordinator job bean |
| * @throws Exception |
| */ |
| protected CoordinatorJobBean addRecordToCoordJobTableWithBundle(String bundleId, String coordId, |
| CoordinatorJob.Status status, Date start, Date end, boolean pending, boolean doneMatd, int lastActionNumber) |
| throws Exception { |
| CoordinatorJobBean coordJob = createCoordJob(status, start, end, pending, doneMatd, 0); |
| coordJob.setBundleId(bundleId); |
| // coord id and coord name are the same |
| coordJob.setId(coordId); |
| coordJob.setAppName(coordId); |
| coordJob.setLastActionNumber(lastActionNumber); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord job record to table"); |
| throw je; |
| } |
| |
| return coordJob; |
| } |
| |
| protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime, |
| Date pauseTime, int timeout, String freq, Timeunit timeUnit, |
| CoordinatorJob.Execution execution, int matThrottling) |
| throws Exception { |
| CoordinatorJobBean coordJob = createCoordJob(status, startTime, endTime, false, false, 0); |
| coordJob.setStartTime(startTime); |
| coordJob.setEndTime(endTime); |
| coordJob.setPauseTime(pauseTime); |
| coordJob.setFrequency(freq); |
| coordJob.setTimeUnit(timeUnit); |
| coordJob.setTimeout(timeout); |
| coordJob.setConcurrency(3); |
| coordJob.setMatThrottling(matThrottling); |
| coordJob.setExecutionOrder(execution); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| } catch (JPAExecutorException ex) { |
| ex.printStackTrace(); |
| fail("Unable to insert the test coord job record to table"); |
| throw ex; |
| } |
| |
| return coordJob; |
| } |
| |
| /** |
| * Create coord job bean |
| * |
| * @param status coord job status |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @return coord job bean |
| * @throws IOException |
| */ |
| protected CoordinatorJobBean createCoordJob(CoordinatorJob.Status status, boolean pending, boolean doneMatd) |
| throws Exception { |
| Path appPath = new Path(getFsTestCaseDir(), "coord"); |
| String appXml = writeCoordXml(appPath); |
| |
| // Set the start and end time in future |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| return createCoordBean(appPath, appXml, status, start, end, pending, doneMatd, 0); |
| } |
| |
| /** |
| * Create coord job bean |
| * |
| * @param status coord job status |
| * @param start start time |
| * @param end end time |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @param lastActionNum last action number |
| * @return coord job bean |
| * @throws IOException |
| */ |
| protected CoordinatorJobBean createCoordJob(CoordinatorJob.Status status, Date start, Date end, boolean pending, |
| boolean doneMatd, int lastActionNum) throws Exception { |
| Path appPath = new Path(getFsTestCaseDir(), "coord"); |
| String appXml = writeCoordXml(appPath, start, end); |
| |
| return createCoordBean(appPath, appXml, status, start, end, pending, doneMatd, lastActionNum); |
| } |
| /** |
| * Create coord job bean |
| * |
| * @param status coord job status |
| * @param start start time |
| * @param end end time |
| * @param created Time |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @param lastActionNum last action number |
| * @return coord job bean |
| * @throws IOException |
| */ |
| protected CoordinatorJobBean createCoordJob( |
| CoordinatorJob.Status status, Date start, Date end, Date createTime, boolean pending, |
| boolean doneMatd, int lastActionNum) throws Exception { |
| Path appPath = new Path(getFsTestCaseDir(), "coord"); |
| String appXml = writeCoordXml(appPath, start, end); |
| |
| return createCoordBean(appPath, appXml, status, start, end, createTime, pending, doneMatd, lastActionNum); |
| } |
| |
| /** |
| * Create coord job bean |
| * |
| * @param testFileName test file name |
| * @param status coord job status |
| * @param start start time |
| * @param end end time |
| * @param pending true if pending is true |
| * @param doneMatd true if doneMaterialization is true |
| * @param lastActionNum last action number |
| * @return coord job bean |
| * @throws IOException |
| */ |
| protected CoordinatorJobBean createCoordJob(String testFileName, CoordinatorJob.Status status, Date start, |
| Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { |
| Path appPath = new Path(getFsTestCaseDir(), "coord"); |
| String appXml = writeCoordXml(appPath, testFileName); |
| |
| return createCoordBean(appPath, appXml, status, start, end, pending, doneMatd, lastActionNum); |
| } |
| |
| private CoordinatorJobBean createCoordBean(Path appPath, String appXml, CoordinatorJob.Status status, Date start, |
| Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { |
| return createCoordBean(appPath, appXml, status, start, end, new Date(), pending, doneMatd, lastActionNum); |
| } |
| |
| private CoordinatorJobBean createCoordBean(Path appPath, String appXml, CoordinatorJob.Status status, Date start, |
| Date end, Date createdTime, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { |
| CoordinatorJobBean coordJob = new CoordinatorJobBean(); |
| coordJob.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR)); |
| coordJob.setAppName("COORD-TEST"); |
| coordJob.setAppPath(appPath.toString()); |
| coordJob.setStatus(status); |
| coordJob.setTimeZone("America/Los_Angeles"); |
| coordJob.setCreatedTime(createdTime); |
| coordJob.setLastModifiedTime(new Date()); |
| coordJob.setUser(getTestUser()); |
| coordJob.setGroup(getTestGroup()); |
| if (pending) { |
| coordJob.setPending(); |
| } |
| if (doneMatd) { |
| coordJob.setDoneMaterialization(); |
| } |
| coordJob.setLastActionNumber(lastActionNum); |
| |
| Configuration conf = getCoordConf(appPath); |
| coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); |
| coordJob.setJobXml(appXml); |
| coordJob.setFrequency("1"); |
| coordJob.setTimeUnit(Timeunit.DAY); |
| coordJob.setExecutionOrder(Execution.FIFO); |
| coordJob.setConcurrency(1); |
| coordJob.setMatThrottling(1); |
| try { |
| coordJob.setStartTime(start); |
| coordJob.setEndTime(end); |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| fail("Could not set Date/time"); |
| } |
| return coordJob; |
| } |
| |
| /** |
| * Write coordinator xml |
| * |
| * @param appPath app path |
| * @throws IOException thrown if unable to write xml |
| * @throws UnsupportedEncodingException thrown if encoding failed |
| */ |
| protected String writeCoordXml(Path appPath) throws IOException, UnsupportedEncodingException { |
| String appXml = getCoordJobXml(appPath); |
| |
| FileSystem fs = getFileSystem(); |
| |
| Writer writer = new OutputStreamWriter(fs.create(new Path(appPath + "/coordinator.xml")), |
| StandardCharsets.UTF_8); |
| byte[] bytes = appXml.getBytes(StandardCharsets.UTF_8); |
| ByteArrayInputStream bais = new ByteArrayInputStream(bytes); |
| Reader reader2 = new InputStreamReader(bais, StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader2, writer); |
| return appXml; |
| } |
| |
| /** |
| * Write coordinator xml |
| * |
| * @param appPath app path |
| * @param start start time |
| * @param end end time |
| * @throws IOException thrown if unable to write xml |
| * @throws UnsupportedEncodingException thrown if encoding failed |
| */ |
| protected String writeCoordXml(Path appPath, Date start, Date end) throws IOException, UnsupportedEncodingException { |
| String appXml = getCoordJobXml(appPath, start, end); |
| |
| FileSystem fs = getFileSystem(); |
| |
| Writer writer = new OutputStreamWriter(fs.create(new Path(appPath + "/coordinator.xml")), |
| StandardCharsets.UTF_8); |
| byte[] bytes = appXml.getBytes(StandardCharsets.UTF_8); |
| ByteArrayInputStream bais = new ByteArrayInputStream(bytes); |
| Reader reader2 = new InputStreamReader(bais, StandardCharsets.UTF_8); |
| IOUtils.copyCharStream(reader2, writer); |
| return appXml; |
| } |
| |
| /** |
| * Write coordinator xml |
| * |
| * @param appPath app path |
| * @return testFileName test file name |
| * @throws IOException thrown if unable to write xml |
| * @throws UnsupportedEncodingException thrown if encoding failed |
| */ |
| protected String writeCoordXml(Path appPath, String testFileName) throws IOException, UnsupportedEncodingException { |
| String appXml = getCoordJobXml(testFileName); |
| writeToFile(appXml, appPath, "coordinator.xml"); |
| return appXml; |
| } |
| |
| /** |
| * Insert coord action for testing. |
| * |
| * @param jobId coord job id |
| * @param actionNum action number |
| * @param status coord action status |
| * @param resourceXmlName xml file name |
| * @param pending pending counter |
| * @return coord action bean |
| * @throws Exception thrown if unable to create coord action bean |
| */ |
| protected CoordinatorActionBean addRecordToCoordActionTable(String jobId, int actionNum, |
| CoordinatorAction.Status status, String resourceXmlName, int pending) throws Exception { |
| return addRecordToCoordActionTable(jobId, actionNum, status, resourceXmlName, pending, null); |
| } |
| |
| /** |
| * Insert coord action for testing. |
| * |
| * @param jobId coord job id |
| * @param actionNum action number |
| * @param status coord action status |
| * @param resourceXmlName xml file name |
| * @param pending pending counter |
| * @param actionNominalTime |
| * @return coord action bean |
| * @throws Exception thrown if unable to create coord action bean |
| */ |
| protected CoordinatorActionBean addRecordToCoordActionTable(String jobId, int actionNum, |
| CoordinatorAction.Status status, String resourceXmlName, int pending, Date actionNominalTime) |
| throws Exception { |
| CoordinatorActionBean action = createCoordAction(jobId, actionNum, status, resourceXmlName, pending, |
| actionNominalTime); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordActionInsertJPAExecutor coordActionInsertCmd = new CoordActionInsertJPAExecutor(action); |
| jpaService.execute(coordActionInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord action record to table"); |
| throw je; |
| } |
| return action; |
| } |
| |
| /** |
| * Insert coord action and workflow id as external id for testing. |
| * |
| * @param jobId coord job id |
| * @param actionNum action number |
| * @param status coord action status |
| * @param resourceXmlName xml file name |
| * @param wfId wf id |
| * @param wfStatus wf status |
| * @param pending pending counter |
| * @return coord action bean |
| * @throws Exception thrown if unable to create coord action bean |
| */ |
| protected CoordinatorActionBean addRecordToCoordActionTable(String jobId, int actionNum, |
| CoordinatorAction.Status status, String resourceXmlName, String wfId, String wfStatus, int pending) |
| throws Exception { |
| CoordinatorActionBean action = createCoordAction(jobId, actionNum, status, resourceXmlName, pending); |
| action.setExternalId(wfId); |
| action.setExternalStatus(wfStatus); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordActionInsertJPAExecutor coordActionInsertExecutor = new CoordActionInsertJPAExecutor(action); |
| jpaService.execute(coordActionInsertExecutor); |
| |
| if (wfId != null) { |
| WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId)); |
| wfJob.setParentId(action.getId()); |
| WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_PARENT_MODIFIED, wfJob); |
| } |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord action record to table"); |
| throw je; |
| } |
| return action; |
| } |
| |
| protected CoordinatorActionBean addRecordToCoordActionTable(CoordinatorActionBean action, String wfId) |
| throws Exception { |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordActionInsertJPAExecutor coordActionInsertExecutor = new CoordActionInsertJPAExecutor(action); |
| jpaService.execute(coordActionInsertExecutor); |
| |
| if (wfId != null) { |
| WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId)); |
| wfJob.setParentId(action.getId()); |
| WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_PARENT_MODIFIED, wfJob); |
| } |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord action record to table"); |
| throw je; |
| } |
| return action; |
| } |
| |
| protected CoordinatorJobBean getCoordinatorJob(String jobId) throws Exception{ |
| CoordinatorJobBean coordJob = null; |
| try { |
| coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, jobId); |
| } |
| catch (JPAExecutorException e) { |
| throw new Exception(e); |
| } |
| return coordJob; |
| } |
| |
| protected CoordinatorActionBean getCoordinatorAction(String actionId) throws Exception{ |
| CoordinatorActionBean cAction = null; |
| try { |
| cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, actionId); |
| } |
| catch (JPAExecutorException e) { |
| throw new Exception(e); |
| } |
| return cAction; |
| } |
| |
| protected CoordinatorActionBean createCoordAction(String jobId, int actionNum, CoordinatorAction.Status status, |
| String resourceXmlName, int pending) throws Exception { |
| return createCoordAction(jobId, actionNum, status, resourceXmlName, pending, "Z", null); |
| } |
| |
| protected CoordinatorActionBean createCoordAction(String jobId, int actionNum, CoordinatorAction.Status status, |
| String resourceXmlName, int pending, Date actionNominalTime) throws Exception { |
| return createCoordAction(jobId, actionNum, status, resourceXmlName, pending, "Z", actionNominalTime); |
| } |
| |
| |
| /** |
| * Create coord action bean |
| * |
| * @param jobId coord job id |
| * @param actionNum action number |
| * @param status coord action status |
| * @param resourceXmlName xml file name |
| * @param pending pending counter |
| * @return coord action bean |
| * @throws Exception thrown if unable to create coord action bean |
| */ |
| protected CoordinatorActionBean createCoordAction(String jobId, int actionNum, CoordinatorAction.Status status, |
| String resourceXmlName, int pending, String oozieTimeZoneMask, Date actionNominalTime) throws Exception { |
| String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, actionNum + ""); |
| Path appPath = new Path(getFsTestCaseDir(), "coord"); |
| String actionXml = getCoordActionXml(appPath, resourceXmlName); |
| actionXml = actionXml.replace("${TZ}", oozieTimeZoneMask); |
| |
| CoordinatorActionBean action = new CoordinatorActionBean(); |
| action.setId(actionId); |
| if(status != CoordinatorAction.Status.SUBMITTED && status != CoordinatorAction.Status.READY) { |
| action.setExternalId(actionId + "_E"); |
| } |
| action.setJobId(jobId); |
| action.setActionNumber(actionNum); |
| action.setPending(pending); |
| try { |
| if (actionNominalTime == null) { |
| String nominalTime = getActionNominalTime(actionXml); |
| |
| action.setNominalTime(DateUtils.parseDateOozieTZ(nominalTime)); |
| } |
| else { |
| action.setNominalTime(actionNominalTime); |
| } |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| fail("Unable to get action nominal time"); |
| throw new IOException(e); |
| } |
| action.setLastModifiedTime(new Date()); |
| action.setCreatedTime(new Date()); |
| action.setStatus(status); |
| action.setActionXml(actionXml); |
| action.setTimeOut(10); |
| |
| Configuration conf = getCoordConf(appPath); |
| action.setCreatedConf(XmlUtils.prettyPrint(conf).toString()); |
| action.setRunConf(XmlUtils.prettyPrint(conf).toString()); |
| return action; |
| } |
| |
| /** |
| * Insert subwf job for testing. |
| * |
| * @param jobStatus workflow job status |
| * @param instanceStatus workflow instance status |
| * @param parentId the id of the parent workflow |
| * @return workflow job bean |
| * @throws Exception thrown if unable to create workflow job bean |
| */ |
| protected WorkflowJobBean addRecordToWfJobTable(WorkflowJob.Status jobStatus, WorkflowInstance.Status instanceStatus, |
| String parentId) throws Exception { |
| WorkflowJobBean subwfBean = addRecordToWfJobTable(jobStatus, instanceStatus); |
| subwfBean.setParentId(parentId); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_PARENT_MODIFIED, subwfBean); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test wf job record to table"); |
| throw je; |
| } |
| return subwfBean; |
| } |
| |
| /** |
| * Insert wf job for testing. |
| * |
| * @param jobStatus workflow job status |
| * @param instanceStatus workflow instance status |
| * @return workflow job bean |
| * @throws Exception thrown if unable to create workflow job bean |
| */ |
| protected WorkflowJobBean addRecordToWfJobTable(WorkflowJob.Status jobStatus, WorkflowInstance.Status instanceStatus) |
| throws Exception { |
| WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef( |
| LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).addNode(new EndNodeDef("end", |
| LiteWorkflowStoreService.LiteControlNodeHandler.class)); |
| return addRecordToWfJobTable(app, jobStatus, instanceStatus); |
| } |
| |
| protected WorkflowJobBean addRecordToWfJobTable(WorkflowApp app, WorkflowJob.Status jobStatus, |
| WorkflowInstance.Status instanceStatus) throws Exception { |
| Configuration conf = new Configuration(); |
| Path appUri = new Path(getAppPath(), "workflow.xml"); |
| conf.set(OozieClient.APP_PATH, appUri.toString()); |
| conf.set(OozieClient.LOG_TOKEN, "testToken"); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| |
| WorkflowJobBean wfBean = createWorkflow(app, conf, jobStatus, instanceStatus); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(wfBean); |
| jpaService.execute(wfInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test wf job record to table"); |
| throw je; |
| } |
| return wfBean; |
| } |
| |
| protected Path getAppPath() { |
| Path baseDir = getFsTestCaseDir(); |
| return new Path(baseDir, "app"); |
| } |
| |
| /** |
| * Insert wf action for testing. |
| * |
| * @param wfId workflow id |
| * @param actionName workflow action name |
| * @param status workflow action status |
| * @return workflow action bean |
| * @throws Exception thrown if unable to create workflow action bean |
| */ |
| protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status) |
| throws Exception { |
| return addRecordToWfActionTable(wfId, actionName, status, "", false); |
| } |
| |
| protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status, |
| boolean pending) throws Exception { |
| return addRecordToWfActionTable(wfId, actionName, status, "", pending); |
| } |
| |
| protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status, |
| String execPath, boolean pending) throws Exception { |
| WorkflowActionBean action = createWorkflowAction(wfId, actionName, status, pending); |
| action.setExecutionPath(execPath); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| WorkflowActionInsertJPAExecutor actionInsertCmd = new WorkflowActionInsertJPAExecutor(action); |
| jpaService.execute(actionInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test wf action record to table"); |
| throw je; |
| } |
| return action; |
| } |
| |
| /** |
| * Insert sla event for testing. |
| * |
| * @param slaId sla id |
| * @param status sla status |
| * @throws Exception thrown if unable to create sla bean |
| */ |
| @Deprecated |
| protected void addRecordToSLAEventTable(String slaId, SLAEvent.Status status, Date today) throws Exception { |
| addRecordToSLAEventTable(slaId, "app-name", status, today); |
| } |
| |
| /** |
| * Insert sla event for testing. |
| * |
| * @param slaId sla id |
| * @param slaId app name |
| * @param status sla status |
| * @throws Exception thrown if unable to create sla bean |
| */ |
| @Deprecated |
| protected void addRecordToSLAEventTable(String slaId, String appName, SLAEvent.Status status, Date today) |
| throws Exception { |
| SLAEventBean sla = new SLAEventBean(); |
| sla.setSlaId(slaId); |
| sla.setAppName(appName); |
| sla.setParentClientId("parent-client-id"); |
| sla.setParentSlaId("parent-sla-id"); |
| sla.setExpectedStart(today); |
| sla.setExpectedEnd(today); |
| sla.setNotificationMsg("notification-msg"); |
| sla.setAlertContact("alert-contact"); |
| sla.setDevContact("dev-contact"); |
| sla.setQaContact("qa-contact"); |
| sla.setSeContact("se-contact"); |
| sla.setAlertFrequency("alert-frequency"); |
| sla.setAlertPercentage("alert-percentage"); |
| sla.setUpstreamApps("upstream-apps"); |
| sla.setAppType(SLAEvent.SlaAppType.WORKFLOW_JOB); |
| sla.setUser(getTestUser()); |
| sla.setGroupName(getTestGroup()); |
| sla.setJobStatus(status); |
| sla.setStatusTimestamp(today); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| SLAEventInsertJPAExecutor slaInsertCmd = new SLAEventInsertJPAExecutor(sla); |
| jpaService.execute(slaInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test sla event record to table"); |
| throw je; |
| } |
| } |
| |
| /** |
| * Insert sla summary for test |
| * @param appName application name |
| * @param status sla status |
| * @return |
| * @throws Exception |
| */ |
| protected SLASummaryBean addRecordToSLASummaryTable(String appName, SLAStatus status) |
| throws Exception { |
| SLASummaryBean sla = new SLASummaryBean(); |
| Date today = new Date(); |
| sla.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR)); |
| sla.setAppName(appName); |
| sla.setAppType(AppType.COORDINATOR_JOB); |
| sla.setCreatedTime(today); |
| sla.setNominalTime(today); |
| sla.setExpectedStart(today); |
| sla.setExpectedEnd(today); |
| sla.setExpectedDuration(100); |
| sla.setJobStatus("RUNNING"); |
| sla.setSLAStatus(status); |
| sla.setEventStatus(EventStatus.START_MET); |
| sla.setLastModifiedTime(today); |
| sla.setUser("oozie"); |
| sla.setParentId(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE)); |
| sla.setEventProcessed(1); |
| sla.setActualStart(today); |
| sla.setActualEnd(today); |
| sla.setActualDuration(100); |
| try { |
| SLASummaryQueryExecutor.getInstance().insert(sla); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test sla event record to table"); |
| throw je; |
| } |
| return sla; |
| } |
| |
| /** |
| * Insert sla registration for test |
| * @param appName application name |
| * @param status sla status |
| * @return |
| * @throws Exception |
| */ |
| protected SLARegistrationBean addRecordToSLARegistrationTable(String appName, SLAStatus status) |
| throws Exception { |
| SLARegistrationBean sla = new SLARegistrationBean(); |
| Date today = new Date(); |
| sla.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR)); |
| sla.setAppName(appName); |
| sla.setAppType(AppType.COORDINATOR_JOB); |
| sla.setExpectedDuration(100); |
| sla.setExpectedEnd(today); |
| sla.setExpectedStart(today); |
| sla.setJobData("test-job-data"); |
| sla.setCreatedTime(today); |
| sla.setNominalTime(today); |
| sla.setNotificationMsg("test-sla-notification-msg"); |
| sla.setParentId(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE)); |
| sla.setSlaConfig("alert-events"); |
| sla.setUpstreamApps("test-upstream-apps"); |
| sla.setUser("oozie"); |
| try { |
| SLARegistrationQueryExecutor.getInstance().insert(sla); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test sla registration record to table"); |
| throw je; |
| } |
| return sla; |
| } |
| |
| /** |
| * Insert bundle job for testing. |
| * |
| * @param jobStatus job status |
| * @param pending true if pending |
| * @return bundle job bean |
| * @throws Exception |
| */ |
| public BundleJobBean addRecordToBundleJobTable(Job.Status jobStatus, boolean pending) throws Exception { |
| BundleJobBean bundle = createBundleJob(jobStatus, pending); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundle); |
| jpaService.execute(bundleInsertjpa); |
| } |
| catch (JPAExecutorException ce) { |
| ce.printStackTrace(); |
| fail("Unable to insert the test bundle job record to table"); |
| throw ce; |
| } |
| return bundle; |
| } |
| |
| /** |
| * Insert a bad bundle job for testing negative cases. |
| * |
| * @param jobStatus job status |
| * @return bundle job bean |
| * @throws Exception |
| */ |
| protected BundleJobBean addRecordToBundleJobTableNegative(Job.Status jobStatus) throws Exception { |
| BundleJobBean bundle = createBundleJobNegative(jobStatus); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundle); |
| jpaService.execute(bundleInsertjpa); |
| } |
| catch (JPAExecutorException ce) { |
| ce.printStackTrace(); |
| fail("Unable to insert the test bundle job record to table"); |
| throw ce; |
| } |
| return bundle; |
| } |
| |
| /** |
| * Insert bundle job for testing. |
| * |
| * @param jobStatus job status |
| * @param pending true if pending |
| * @return bundle job bean |
| * @throws Exception |
| */ |
| protected BundleJobBean addRecordToBundleJobTableDisabledCoord(Job.Status jobStatus) throws Exception { |
| BundleJobBean bundle = createBundleJobCoordDisabled(jobStatus); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundle); |
| jpaService.execute(bundleInsertjpa); |
| } |
| catch (JPAExecutorException ce) { |
| ce.printStackTrace(); |
| fail("Unable to insert the test bundle job record to table"); |
| throw ce; |
| } |
| return bundle; |
| } |
| |
| /** |
| * Creates bundle job bean with one disabled coordinator |
| * |
| * @param jobStatus job status |
| * @return bundle job bean |
| * @throws Exception |
| */ |
| protected BundleJobBean createBundleJobCoordDisabled(Job.Status jobStatus) throws Exception { |
| return createBundleJobCoordDisabled(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE), |
| jobStatus); |
| } |
| |
| /** |
| * Creates bundle job bean with one disabled coordinator |
| * |
| * @param jobID |
| * @param jobStatus job status |
| * @return bundle job bean |
| * @throws Exception |
| */ |
| protected BundleJobBean createBundleJobCoordDisabled(String jobID, Job.Status jobStatus) throws Exception { |
| Path coordPath1 = new Path(getFsTestCaseDir(), "coord1"); |
| Path coordPath2 = new Path(getFsTestCaseDir(), "coord2"); |
| writeCoordXml(coordPath1, "coord-job-bundle.xml"); |
| writeCoordXml(coordPath2, "coord-job-bundle.xml"); |
| |
| Path bundleAppPath = new Path(getFsTestCaseDir(), "bundle"); |
| String bundleAppXml = getBundleXml("bundle-submit-job.xml"); |
| assertNotNull(bundleAppXml); |
| assertTrue(bundleAppXml.length() > 0); |
| |
| bundleAppXml = bundleAppXml.replaceAll("#app_path1", |
| Matcher.quoteReplacement(new Path(coordPath1.toString(), "coordinator.xml").toString())); |
| bundleAppXml = bundleAppXml.replaceAll("#app_path2", |
| Matcher.quoteReplacement(new Path(coordPath2.toString(), "coordinator.xml").toString())); |
| |
| writeToFile(bundleAppXml, bundleAppPath, "bundle.xml"); |
| |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.BUNDLE_APP_PATH, bundleAppPath.toString()); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| conf.set("jobTracker", getJobTrackerUri()); |
| conf.set("nameNode", getNameNodeUri()); |
| conf.set("appName", "bundle-app-name"); |
| conf.set("coordName1", "coord1"); |
| conf.set("coordName2", "coord2"); |
| conf.set("isEnabled", "false"); |
| |
| BundleJobBean bundle = new BundleJobBean(); |
| bundle.setId(jobID); |
| bundle.setAppName("BUNDLE-TEST"); |
| bundle.setAppPath(bundleAppPath.toString()); |
| bundle.setConf(XmlUtils.prettyPrint(conf).toString()); |
| bundle.setConsoleUrl("consoleUrl"); |
| bundle.setCreatedTime(new Date()); |
| bundle.setJobXml(bundleAppXml); |
| bundle.setLastModifiedTime(new Date()); |
| bundle.setOrigJobXml(bundleAppXml); |
| bundle.resetPending(); |
| bundle.setStatus(jobStatus); |
| bundle.setUser(conf.get(OozieClient.USER_NAME)); |
| bundle.setGroup(conf.get(OozieClient.GROUP_NAME)); |
| return bundle; |
| } |
| |
| /** |
| * Create bundle action bean and save to db |
| * |
| * @param jobId bundle job id |
| * @param coordName coordinator name |
| * @param pending true if action is pending |
| * @param status job status |
| * @return bundle action bean |
| * @throws Exception |
| */ |
| protected BundleActionBean addRecordToBundleActionTable(String jobId, String coordName, int pending, |
| Job.Status status) throws Exception { |
| BundleActionBean action = createBundleAction(jobId, null, coordName, pending, status); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| BundleActionInsertJPAExecutor bundleActionJPAExecutor = new BundleActionInsertJPAExecutor(action); |
| jpaService.execute(bundleActionJPAExecutor); |
| } |
| catch (JPAExecutorException ex) { |
| ex.printStackTrace(); |
| fail("Unable to insert the test bundle action record to table"); |
| throw ex; |
| } |
| |
| return action; |
| } |
| |
| /** |
| * Create bundle action bean and save to db |
| * |
| * @param jobId bundle job id |
| * @param coordId coordinator id |
| * @param coordName coordinator name |
| * @param pending true if action is pending |
| * @param status job status |
| * @return bundle action bean |
| * @throws Exception |
| */ |
| protected BundleActionBean addRecordToBundleActionTable(String jobId, String coordId, String coordName, int pending, |
| Job.Status status) throws Exception { |
| BundleActionBean action = createBundleAction(jobId, coordId, coordName, pending, status); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| BundleActionInsertJPAExecutor bundleActionJPAExecutor = new BundleActionInsertJPAExecutor(action); |
| jpaService.execute(bundleActionJPAExecutor); |
| |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId)); |
| coordJob.setBundleId(jobId); |
| CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_BUNDLEID, coordJob); |
| } |
| catch (JPAExecutorException ex) { |
| ex.printStackTrace(); |
| fail("Unable to insert the test bundle action record to table"); |
| throw ex; |
| } |
| |
| return action; |
| } |
| |
| /** |
| * Create bundle action bean |
| * |
| * @param jobId bundle job id |
| * @param coordId coordinator id |
| * @param coordName coordinator name |
| * @param pending true if action is pending |
| * @param status job status |
| * @return bundle action bean |
| * @throws Exception |
| */ |
| protected BundleActionBean createBundleAction(String jobId, String coordId, String coordName, int pending, Job.Status status) |
| throws Exception { |
| BundleActionBean action = new BundleActionBean(); |
| action.setBundleId(jobId); |
| action.setBundleActionId(jobId + "_" + coordName); |
| action.setPending(pending); |
| action.setCoordId(coordId); |
| action.setCoordName(coordName); |
| action.setStatus(status); |
| action.setLastModifiedTime(new Date()); |
| |
| return action; |
| } |
| |
| /** |
| * Read coord job xml from test resources |
| * |
| * @param appPath application path |
| * @return content of coord job xml |
| */ |
| protected String getCoordJobXml(Path appPath) { |
| String inputTemplate = appPath + "/coord-input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"; |
| inputTemplate = Matcher.quoteReplacement(inputTemplate); |
| String outputTemplate = appPath + "/coord-input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"; |
| outputTemplate = Matcher.quoteReplacement(outputTemplate); |
| try { |
| Reader reader = IOUtils.getResourceAsReader("coord-rerun-job.xml", -1); |
| String appXml = IOUtils.getReaderAsString(reader, -1); |
| appXml = appXml.replaceAll("#inputTemplate", inputTemplate); |
| appXml = appXml.replaceAll("#outputTemplate", outputTemplate); |
| return appXml; |
| } |
| catch (IOException ioe) { |
| throw new RuntimeException(XLog.format("Could not get coord-rerun-job.xml", ioe)); |
| } |
| } |
| |
| /** |
| * Read coord job xml from test resources |
| * |
| * @param appPath application path |
| * @param start start time |
| * @param end end time |
| * @return content of coord job xml |
| */ |
| protected String getCoordJobXml(Path appPath, Date start, Date end) { |
| String startDateStr = null, endDateStr = null; |
| try { |
| startDateStr = DateUtils.formatDateOozieTZ(start); |
| endDateStr = DateUtils.formatDateOozieTZ(end); |
| } |
| catch (Exception ex) { |
| ex.printStackTrace(); |
| fail("Could not format dates"); |
| } |
| try { |
| Reader reader = IOUtils.getResourceAsReader("coord-matd-job.xml", -1); |
| String appXml = IOUtils.getReaderAsString(reader, -1); |
| appXml = appXml.replaceAll("#start", startDateStr); |
| appXml = appXml.replaceAll("#end", endDateStr); |
| return appXml; |
| } |
| catch (IOException ioe) { |
| throw new RuntimeException(XLog.format("Could not get coord-matd-job.xml", ioe)); |
| } |
| } |
| |
| /** |
| * Read coord job xml from test resources |
| * |
| * @param testFileName file name of coord job xml |
| * @return content of coord job xml |
| */ |
| protected String getCoordJobXml(String testFileName) { |
| try { |
| Reader reader = IOUtils.getResourceAsReader(testFileName, -1); |
| String appXml = IOUtils.getReaderAsString(reader, -1); |
| return appXml; |
| } |
| catch (IOException ioe) { |
| throw new RuntimeException(XLog.format("Could not get [{0}]", testFileName, ioe)); |
| } |
| } |
| |
| /** |
| * Read coord action xml from test resources |
| * |
| * @param appPath application path |
| * @param resourceXmlName file name of coord action xml |
| * @return content of coord action xml |
| */ |
| protected String getCoordActionXml(Path appPath, String resourceXmlName) { |
| String inputTemplate = appPath + "/coord-input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"; |
| inputTemplate = Matcher.quoteReplacement(inputTemplate); |
| String outputTemplate = appPath + "/coord-input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"; |
| outputTemplate = Matcher.quoteReplacement(outputTemplate); |
| |
| String inputDir = appPath + "/coord-input/2010/07/05/01/00"; |
| inputDir = Matcher.quoteReplacement(inputDir); |
| String outputDir = appPath + "/coord-input/2009/12/14/11/00"; |
| outputDir = Matcher.quoteReplacement(outputDir); |
| try { |
| Reader reader = IOUtils.getResourceAsReader(resourceXmlName, -1); |
| String appXml = IOUtils.getReaderAsString(reader, -1); |
| appXml = appXml.replaceAll("#inputTemplate", inputTemplate); |
| appXml = appXml.replaceAll("#outputTemplate", outputTemplate); |
| appXml = appXml.replaceAll("#inputDir", inputDir); |
| appXml = appXml.replaceAll("#outputDir", outputDir); |
| return appXml; |
| } |
| catch (IOException ioe) { |
| throw new RuntimeException(XLog.format("Could not get " + resourceXmlName, ioe)); |
| } |
| } |
| |
| /** |
| * Get coordinator configuration |
| * |
| * @param appPath application path |
| * @return coordinator configuration |
| * @throws IOException thrown if unable to get coord conf |
| */ |
| protected Configuration getCoordConf(Path appPath) throws IOException { |
| Path wfAppPath = new Path(getFsTestCaseDir(), "coord"); |
| |
| Configuration jobConf = new XConfiguration(); |
| jobConf.set(OozieClient.COORDINATOR_APP_PATH, appPath.toString()); |
| jobConf.set(OozieClient.USER_NAME, getTestUser()); |
| jobConf.set("jobTracker", getJobTrackerUri()); |
| jobConf.set("nameNode", getNameNodeUri()); |
| jobConf.set("wfAppPath", wfAppPath.toString()); |
| |
| String content = "<workflow-app xmlns='uri:oozie:workflow:0.1' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>"; |
| content += "<start to='end' />"; |
| content += "<end name='end' /></workflow-app>"; |
| writeToFile(content, wfAppPath, "workflow.xml"); |
| |
| return jobConf; |
| } |
| |
| protected void writeToFile(String content, Path appPath, String fileName) throws IOException { |
| FileSystem fs = getFileSystem(); |
| Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, fileName), true), |
| StandardCharsets.UTF_8); |
| writer.write(content); |
| writer.close(); |
| } |
| |
| /** |
| * Get action nominal time. |
| * |
| * @param actionXml |
| * @return |
| */ |
| protected String getActionNominalTime(String actionXml) { |
| Element eAction; |
| try { |
| eAction = XmlUtils.parseXml(actionXml); |
| } |
| catch (JDOMException je) { |
| throw new RuntimeException(XLog.format("Could not parse actionXml :" + actionXml, je)); |
| } |
| String actionNomialTime = eAction.getAttributeValue("action-nominal-time"); |
| |
| return actionNomialTime; |
| } |
| |
| /** |
| * Create workflow job bean |
| * |
| * @param app workflow app |
| * @param conf workflow configuration |
| * @param authToken auth token |
| * @param jobStatus workflow job status |
| * @param instanceStatus workflow instance status |
| * @return workflow job bean |
| * @throws Exception thrown if unable to create workflow job bean |
| */ |
| protected WorkflowJobBean createWorkflow(WorkflowApp app, Configuration conf, |
| WorkflowJob.Status jobStatus, WorkflowInstance.Status instanceStatus) throws Exception { |
| WorkflowAppService wps = Services.get().get(WorkflowAppService.class); |
| Configuration protoActionConf = wps.createProtoActionConf(conf, true); |
| WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); |
| WorkflowInstance wfInstance = workflowLib.createInstance(app, conf); |
| ((LiteWorkflowInstance) wfInstance).setStatus(instanceStatus); |
| WorkflowJobBean workflow = new WorkflowJobBean(); |
| workflow.setId(wfInstance.getId()); |
| workflow.setExternalId("extid"); |
| workflow.setAppName(app.getName()); |
| workflow.setAppPath(conf.get(OozieClient.APP_PATH)); |
| workflow.setConf(XmlUtils.prettyPrint(conf).toString()); |
| workflow.setProtoActionConf(XmlUtils.prettyPrint(protoActionConf).toString()); |
| workflow.setCreatedTime(new Date()); |
| workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); |
| workflow.setStatus(jobStatus); |
| workflow.setRun(0); |
| workflow.setUser(conf.get(OozieClient.USER_NAME)); |
| workflow.setGroup(conf.get(OozieClient.GROUP_NAME)); |
| workflow.setWorkflowInstance(wfInstance); |
| workflow.setSlaXml("<sla></sla>"); |
| return workflow; |
| } |
| |
| /** |
| * Create workflow action bean |
| * |
| * @param wfId workflow job id |
| * @param actionName workflow action name |
| * @param status workflow action status |
| * @return workflow action bean |
| * @throws Exception thrown if unable to create workflow action bean |
| */ |
| protected WorkflowActionBean createWorkflowAction(String wfId, String actionName, WorkflowAction.Status status, |
| boolean pending) throws Exception { |
| WorkflowActionBean action = new WorkflowActionBean(); |
| action.setName(actionName); |
| action.setId(Services.get().get(UUIDService.class).generateChildId(wfId, actionName)); |
| action.setJobId(wfId); |
| action.setType("map-reduce"); |
| action.setTransition("transition"); |
| action.setStatus(status); |
| final Date currDate = new Date(); |
| action.setCreatedTime(currDate); |
| action.setStartTime(currDate); |
| action.setEndTime(currDate); |
| action.setLastCheckTime(currDate); |
| action.setStats("dummyStats"); |
| if (pending) { |
| action.setPending(); |
| } |
| else { |
| action.resetPending(); |
| } |
| |
| Path inputDir = new Path(getFsTestCaseDir(), "input"); |
| Path outputDir = new Path(getFsTestCaseDir(), "output"); |
| |
| FileSystem fs = getFileSystem(); |
| Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")), StandardCharsets.UTF_8); |
| w.write("dummy\n"); |
| w.write("dummy\n"); |
| w.close(); |
| |
| String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" |
| + getNameNodeUri() + "</name-node>" + "<configuration>" |
| + "<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() |
| + "</value></property>" + "<property><name>mapred.reducer.class</name><value>" |
| + MapperReducerForTest.class.getName() + "</value></property>" |
| + "<property><name>mapred.input.dir</name><value>" + inputDir.toString() + "</value></property>" |
| + "<property><name>mapred.output.dir</name><value>" + outputDir.toString() + "</value></property>" |
| + "</configuration>" + "</map-reduce>"; |
| action.setConf(actionXml); |
| action.setSlaXml("<sla></sla>"); |
| action.setData("dummy data"); |
| action.setStats("dummy stats"); |
| action.setExternalChildIDs("job_201601011800_0001"); |
| action.setRetries(2); |
| action.setUserRetryCount(1); |
| action.setUserRetryMax(2); |
| action.setUserRetryInterval(1); |
| action.setErrorInfo("dummyErrorCode", "dummyErrorMessage"); |
| action.setExternalId("application_1234567890123_0001"); |
| action.setExternalStatus("RUNNING"); |
| |
| return action; |
| } |
| |
| protected WorkflowActionBean createWorkflowAction(String wfId, String actionName, WorkflowAction.Status status) |
| throws Exception { |
| return createWorkflowAction(wfId, actionName, status, false); |
| } |
| |
| /** |
| * Create bundle job bean |
| * |
| * @param jobStatus job status |
| * @param pending true if pending |
| * @return bundle job bean |
| * @throws Exception |
| */ |
| protected BundleJobBean createBundleJob(String jobID, Job.Status jobStatus, boolean pending) throws Exception { |
| Path coordPath1 = new Path(getFsTestCaseDir(), "coord1"); |
| Path coordPath2 = new Path(getFsTestCaseDir(), "coord2"); |
| writeCoordXml(coordPath1, "coord-job-bundle.xml"); |
| writeCoordXml(coordPath2, "coord-job-bundle.xml"); |
| |
| Path bundleAppPath = new Path(getFsTestCaseDir(), "bundle"); |
| String bundleAppXml = getBundleXml("bundle-submit-job.xml"); |
| assertNotNull(bundleAppXml); |
| assertTrue(bundleAppXml.length() > 0); |
| |
| bundleAppXml = bundleAppXml |
| .replaceAll("#app_path1", Matcher.quoteReplacement(new Path(coordPath1.toString(), "coordinator.xml").toString())); |
| bundleAppXml = bundleAppXml |
| .replaceAll("#app_path2", Matcher.quoteReplacement(new Path(coordPath2.toString(), "coordinator.xml").toString())); |
| |
| writeToFile(bundleAppXml, bundleAppPath, "bundle.xml"); |
| |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.BUNDLE_APP_PATH, bundleAppPath.toString()); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| conf.set("jobTracker", getJobTrackerUri()); |
| conf.set("nameNode", getNameNodeUri()); |
| conf.set("appName", "bundle-app-name"); |
| conf.set("coordName1", "coord1"); |
| conf.set("coordName2", "coord2"); |
| conf.set("isEnabled", "true"); |
| |
| BundleJobBean bundle = new BundleJobBean(); |
| bundle.setId(jobID); |
| bundle.setAppName("BUNDLE-TEST"); |
| bundle.setAppPath(bundleAppPath.toString()); |
| bundle.setConf(XmlUtils.prettyPrint(conf).toString()); |
| bundle.setConsoleUrl("consoleUrl"); |
| bundle.setCreatedTime(new Date()); |
| // TODO bundle.setStartTime(startTime); |
| // TODO bundle.setEndTime(endTime); |
| // TODO bundle.setExternalId(externalId); |
| bundle.setJobXml(bundleAppXml); |
| bundle.setLastModifiedTime(new Date()); |
| bundle.setOrigJobXml(bundleAppXml); |
| if (pending) { |
| bundle.setPending(); |
| } |
| else { |
| bundle.resetPending(); |
| } |
| bundle.setStatus(jobStatus); |
| bundle.setUser(conf.get(OozieClient.USER_NAME)); |
| bundle.setGroup(conf.get(OozieClient.GROUP_NAME)); |
| |
| return bundle; |
| } |
| |
| /** |
| * Create bundle job bean |
| * @param jobStatus |
| * @param pending |
| * @return |
| * @throws Exception |
| */ |
| protected BundleJobBean createBundleJob(Job.Status jobStatus, boolean pending) throws Exception { |
| return createBundleJob(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE), jobStatus, pending); |
| } |
| |
| |
| /** |
| * Create bundle job that contains bad coordinator jobs |
| * |
| * @param jobStatus |
| * @return bundle job bean |
| * @throws Exception |
| */ |
| protected BundleJobBean createBundleJobNegative(Job.Status jobStatus) throws Exception { |
| Path coordPath1 = new Path(getFsTestCaseDir(), "coord1"); |
| Path coordPath2 = new Path(getFsTestCaseDir(), "coord2"); |
| writeCoordXml(coordPath1, "coord-job-bundle-negative.xml"); |
| writeCoordXml(coordPath2, "coord-job-bundle-negative.xml"); |
| |
| Path bundleAppPath = new Path(getFsTestCaseDir(), "bundle"); |
| String bundleAppXml = getBundleXml("bundle-submit-job.xml"); |
| |
| bundleAppXml = bundleAppXml |
| .replaceAll("#app_path1", new Path(coordPath1.toString(), "coordinator.xml").toString()); |
| bundleAppXml = bundleAppXml |
| .replaceAll("#app_path2", new Path(coordPath1.toString(), "coordinator.xml").toString()); |
| writeToFile(bundleAppXml, bundleAppPath, "bundle.xml"); |
| |
| Configuration conf = new XConfiguration(); |
| conf.set(OozieClient.BUNDLE_APP_PATH, bundleAppPath.toString()); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| conf.set("jobTracker", getJobTrackerUri()); |
| conf.set("nameNode", getNameNodeUri()); |
| conf.set("coordName1", "coord1"); |
| conf.set("coordName2", "coord2"); |
| conf.set("coord1.starttime","2009-02-01T00:00Z"); |
| conf.set("isEnabled", "true"); |
| |
| BundleJobBean bundle = new BundleJobBean(); |
| bundle.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE)); |
| bundle.setAppName("BUNDLE-TEST"); |
| bundle.setAppPath(bundleAppPath.toString()); |
| bundle.setConf(XmlUtils.prettyPrint(conf).toString()); |
| bundle.setConsoleUrl("consoleUrl"); |
| bundle.setCreatedTime(new Date()); |
| bundle.setJobXml(bundleAppXml); |
| bundle.setLastModifiedTime(new Date()); |
| bundle.setOrigJobXml(bundleAppXml); |
| bundle.resetPending(); |
| bundle.setStatus(jobStatus); |
| bundle.setUser(conf.get(OozieClient.USER_NAME)); |
| bundle.setGroup(conf.get(OozieClient.GROUP_NAME)); |
| |
| return bundle; |
| } |
| |
| protected String getBundleXml(String resourceXmlName) { |
| try { |
| Reader reader = IOUtils.getResourceAsReader(resourceXmlName, -1); |
| String appXml = IOUtils.getReaderAsString(reader, -1); |
| return appXml; |
| } |
| catch (IOException ioe) { |
| throw new RuntimeException(XLog.format("Could not get " + resourceXmlName, ioe)); |
| } |
| } |
| |
| /** |
| * Inserts a record to coord action table |
| * |
| * @param action the record to be inserted |
| * @throws Exception |
| */ |
| protected void insertRecordCoordAction(CoordinatorActionBean action) throws Exception { |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordActionInsertJPAExecutor coordActionInsertCmd = new CoordActionInsertJPAExecutor(action); |
| jpaService.execute(coordActionInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord action record to table"); |
| throw je; |
| } |
| } |
| |
| // Exclude some of the services classes from loading so they dont interfere |
| // while the test case is running |
| protected void setClassesToBeExcluded(Configuration conf, String[] excludedServices) { |
| String classes = conf.get(Services.CONF_SERVICE_CLASSES); |
| StringBuilder builder = new StringBuilder(classes); |
| for (String s : excludedServices) { |
| int index = builder.indexOf(s); |
| if (index != -1) { |
| builder.replace(index, index + s.length() + 1, ""); |
| } |
| } |
| conf.set(Services.CONF_SERVICE_CLASSES, new String(builder)); |
| } |
| |
| /** |
| * Add a particular service class to be run in addition to default ones |
| * @param conf |
| * @param serviceName |
| */ |
| protected void addServiceToRun(Configuration conf, String serviceName) { |
| String classes = conf.get(Services.CONF_SERVICE_CLASSES); |
| StringBuilder builder = new StringBuilder(classes); |
| builder.append("," + serviceName); |
| conf.set(Services.CONF_SERVICE_CLASSES, new String(builder)); |
| } |
| |
| /** |
| * Adds the db records for the Bulk Monitor tests |
| */ |
| protected void addRecordsForBulkMonitor() throws Exception { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| // adding the bundle job |
| BundleJobBean bundle = addRecordToBundleJobTable(BundleJob.Status.RUNNING, false); |
| bundleId = bundle.getId(); |
| bundleName = bundle.getAppName(); |
| addCoordForBulkMonitor(bundleId); |
| } |
| |
| protected void addCoordForBulkMonitor(String bundleId) throws Exception { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| // adding coordinator job(s) for this bundle |
| addRecordToCoordJobTableWithBundle(bundleId, "Coord1", CoordinatorJob.Status.RUNNING, true, true, 2); |
| addRecordToCoordJobTableWithBundle(bundleId, "Coord2", CoordinatorJob.Status.RUNNING, true, true, 1); |
| addRecordToCoordJobTableWithBundle(bundleId, "Coord3", CoordinatorJob.Status.RUNNING, true, true, 1); |
| |
| // adding coordinator action #1 to Coord#1 |
| CoordinatorActionBean action1 = new CoordinatorActionBean(); |
| action1.setId("Coord1@1"); |
| action1.setStatus(CoordinatorAction.Status.FAILED); |
| action1.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME)); |
| action1.setJobId("Coord1"); |
| |
| Calendar cal = Calendar.getInstance(); |
| cal.setTime(DateUtils.parseDateUTC(CREATE_TIME)); |
| cal.add(Calendar.DATE, -1); |
| |
| action1.setNominalTime(cal.getTime()); |
| CoordActionInsertJPAExecutor actionInsert = new CoordActionInsertJPAExecutor(action1); |
| jpaService.execute(actionInsert); |
| |
| // adding coordinator action #2 to Coord#1 |
| CoordinatorActionBean action2 = new CoordinatorActionBean(); |
| action2.setId("Coord1@2"); |
| action2.setStatus(CoordinatorAction.Status.KILLED); |
| action2.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME)); |
| action2.setJobId("Coord1"); |
| |
| cal.setTime(DateUtils.parseDateUTC(CREATE_TIME)); |
| cal.add(Calendar.DATE, -1); |
| |
| action2.setNominalTime(cal.getTime()); |
| actionInsert = new CoordActionInsertJPAExecutor(action2); |
| jpaService.execute(actionInsert); |
| |
| // adding coordinator action #3 to Coord#2 |
| CoordinatorActionBean action3 = new CoordinatorActionBean(); |
| action3.setId("Coord2@1"); |
| action3.setStatus(CoordinatorAction.Status.KILLED); |
| action3.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME)); |
| action3.setJobId("Coord2"); |
| |
| cal.setTime(DateUtils.parseDateUTC(CREATE_TIME)); |
| cal.add(Calendar.DATE, -1); |
| |
| action3.setNominalTime(cal.getTime()); |
| actionInsert = new CoordActionInsertJPAExecutor(action3); |
| jpaService.execute(actionInsert); |
| } |
| |
| /** |
| * Add a month to the current time |
| * |
| * @param incrementMonth |
| * @return |
| */ |
| protected static String getCurrentDateafterIncrementingInMonths(int incrementMonth) { |
| Calendar currentDate = Calendar.getInstance(); |
| currentDate.set(Calendar.MONTH, currentDate.get(Calendar.MONTH) + incrementMonth); |
| return DateUtils.formatDateOozieTZ(currentDate); |
| } |
| |
| protected String addInitRecords(String pushMissingDependencies) throws Exception { |
| return addInitRecords(null, pushMissingDependencies, "Z"); |
| } |
| |
| protected String addInitRecords(String missingDependencies, String pushMissingDependencies, String oozieTimeZoneMask) |
| throws Exception { |
| CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml", |
| CoordinatorJob.Status.RUNNING, false, true); |
| |
| return addInitRecords(missingDependencies, pushMissingDependencies, oozieTimeZoneMask, job, 1); |
| } |
| |
| protected String addInitRecords(String missingDependencies, String pushMissingDependencies, String oozieTimeZoneMask, |
| CoordinatorJobBean job, int actionNum) throws Exception { |
| CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), actionNum, |
| CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", missingDependencies, |
| pushMissingDependencies, oozieTimeZoneMask); |
| return action.getId(); |
| } |
| |
| protected CoordinatorActionBean addRecordToCoordActionTableForWaiting(String jobId, int actionNum, |
| CoordinatorAction.Status status, String resourceXmlName, String missingDependencies, |
| String pushMissingDependencies, String oozieTimeZoneMask) throws Exception { |
| CoordinatorActionBean action = createCoordAction(jobId, actionNum, status, resourceXmlName, 0, |
| oozieTimeZoneMask, null); |
| action.setMissingDependencies(missingDependencies); |
| action.setPushMissingDependencies(pushMissingDependencies); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordActionInsertJPAExecutor coordActionInsertCmd = new CoordActionInsertJPAExecutor(action); |
| jpaService.execute(coordActionInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord action record to table"); |
| throw je; |
| } |
| return action; |
| } |
| |
| protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJob.Status status, |
| boolean pending, boolean doneMatd) throws Exception { |
| CoordinatorJobBean coordJob = createCoordJob(status, pending, doneMatd); |
| return addRecordToCoordJobTableForWaiting(testFileName, coordJob); |
| } |
| |
| protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJob.Status status, |
| Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { |
| CoordinatorJobBean coordJob = createCoordJob(testFileName, status, start, end, pending, doneMatd, lastActionNum); |
| return addRecordToCoordJobTableForWaiting(testFileName, coordJob); |
| } |
| |
| protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJobBean coordJob) |
| throws Exception { |
| |
| String testDir = getTestCaseDir(); |
| String appXml = getCoordJobXmlForWaiting(testFileName, testDir); |
| coordJob.setJobXml(appXml); |
| |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test coord job record to table"); |
| throw je; |
| } |
| |
| return coordJob; |
| } |
| |
| protected String getCoordJobXmlForWaiting(String testFileName, String testDir) { |
| try { |
| Reader reader = IOUtils.getResourceAsReader(testFileName, -1); |
| String appXml = IOUtils.getReaderAsString(reader, -1); |
| appXml = appXml.replaceAll("#testDir", testDir); |
| return appXml; |
| } |
| catch (IOException ioe) { |
| throw new RuntimeException(XLog.format("Could not get " + testFileName, ioe)); |
| } |
| } |
| |
| protected void setCoordActionCreationTime(String actionId, long actionCreationTime) throws Exception { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); |
| action.setCreatedTime(new Date(actionCreationTime)); |
| CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action); |
| } |
| |
| protected void setCoordActionNominalTime(String actionId, long actionNominalTime) throws Exception { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); |
| action.setNominalTime(new Date(actionNominalTime)); |
| CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action); |
| } |
| |
| protected void setCoordActionStatus(String actionId, CoordinatorAction.Status status) throws Exception { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); |
| action.setStatus(status); |
| CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action); |
| } |
| |
| protected void setMissingDependencies(String actionId, String missingDependencies) throws Exception { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); |
| action.setMissingDependencies(missingDependencies); |
| CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action); |
| } |
| |
| protected void modifyCoordForRunning(CoordinatorJobBean coord) throws Exception { |
| String wfXml = IOUtils.getResourceAsString("wf-credentials.xml", -1); |
| writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml"); |
| String coordXml = coord.getJobXml(); |
| coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml")); |
| CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord); |
| } |
| |
| protected void writeToFile(String appXml, File appPathFile) throws Exception { |
| PrintWriter out = null; |
| try { |
| out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(appPathFile), |
| StandardCharsets.UTF_8)); |
| out.println(appXml); |
| } |
| catch (IOException iex) { |
| throw iex; |
| } |
| finally { |
| if (out != null) { |
| out.close(); |
| } |
| } |
| } |
| |
| protected CoordinatorActionBean checkCoordActionStatus(final String actionId, final CoordinatorAction.Status stat) |
| throws Exception { |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| waitFor(5 * 1000, new Predicate() { |
| @Override |
| public boolean evaluate() throws Exception { |
| try { |
| CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); |
| return stat.equals(action.getStatus()); |
| } |
| catch (JPAExecutorException se) { |
| throw new Exception("Action ID " + actionId + " was not stored properly in db"); |
| } |
| } |
| }); |
| CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); |
| assertEquals(stat, action.getStatus()); |
| return action; |
| } |
| } |