blob: 15e93550148b33b4b69ecd744ae7734dd47eba5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.coord;
import java.io.File;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.SLAEventBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.coord.TimeUnit;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor;
import org.apache.oozie.executor.jpa.SLAEventsGetForSeqIdJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
@SuppressWarnings("deprecation")
public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
private static final int TIME_IN_MIN = 60 * 1000;
private static final int TIME_IN_HOURS = TIME_IN_MIN * 60;
private static final int TIME_IN_DAY = TIME_IN_HOURS * 24;
private JPAService jpaService;
@Override
protected void setUp() throws Exception {
super.setUp();
new Services().init();
Services.get().setService(FakeCallableQueueService.class);
Services.get().get(SchedulerService.class).destroy();
jpaService = Services.get().get(JPAService.class);
}
@Override
protected void tearDown() throws Exception {
Services.get().destroy();
super.tearDown();
}
public void testActionMater() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordAction(job.getId() + "@1");
}
public void testActionMaterForHcatalog() throws Exception {
Services.get().destroy();
Services services = super.setupServicesForHCatalog();
services.init();
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-hcat.xml",
CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
CoordinatorActionBean actionBean = getCoordAction(job.getId() + "@1");
assertEquals("file://dummyhdfs/2009/05/_SUCCESS" + CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+ "${coord:latestRange(-1,0)}", actionBean.getMissingDependencies());
assertEquals("hcat://dummyhcat:1000/db1/table1/ds=2009-12" + CoordELFunctions.INSTANCE_SEPARATOR
+ "hcat://dummyhcat:1000/db3/table3/ds=2009-05" + CoordELFunctions.INSTANCE_SEPARATOR
+ "hcat://dummyhcat:1000/db3/table3/ds=2009-26", actionBean.getPushMissingDependencies());
}
public void testActionMaterForHcatalogIncorrectURI() throws Exception {
Services.get().destroy();
Services services = super.setupServicesForHCatalog();
services.init();
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-neg-hcat.xml",
CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
try {
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
fail("Expected Command exception but didn't catch any");
}
catch (CommandException e) {
e.printStackTrace();
job = services.get(JPAService.class).execute(new CoordJobGetJPAExecutor(job.getId()));
assertEquals(CoordinatorJob.Status.FAILED, job.getStatus());
assertEquals(ErrorCode.E1012, e.getErrorCode());
}
catch (Exception e) {
fail("Unexpected exception " + e.getMessage());
}
}
public void testActionMaterForHcatalogRelativePath() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-relative.xml",
CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
}
public void testActionMaterWithCronFrequency1() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"10,20 * * * *");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:20Z")};
final int expectedNominalTimeCount = 2;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue(job.isDoneMaterialization());
assertEquals(job.getLastActionNumber(), expectedNominalTimeCount);
assertEquals(job.getNextMaterializedTime(), DateUtils.parseDateOozieTZ("2013-07-18T01:10Z"));
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testActionMaterWithCronFrequency2() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"10-20 * * * *");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:11Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:12Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:13Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:14Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:15Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:16Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:17Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:18Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:19Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),};
final int expectedNominalTimeCount = 11;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue(job.isDoneMaterialization());
assertEquals(job.getLastActionNumber(), expectedNominalTimeCount);
assertEquals(job.getNextMaterializedTime(), DateUtils.parseDateOozieTZ("2013-07-18T01:10Z"));
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testActionMaterWithCronFrequency3() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"0/15 2 * 5-7 4,5");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
final int expectedNominalTimeCount = 0;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, new Date[]{});
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue(job.isDoneMaterialization());
assertEquals(job.getLastActionNumber(), expectedNominalTimeCount);
assertEquals(job.getNextMaterializedTime(), DateUtils.parseDateOozieTZ("2013-07-18T02:00Z"));
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testActionMaterWithCronFrequency4() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"0/15 * * 5-7 4,5");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:15Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:30Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:45Z"),};
final int expectedNominalTimeCount = 4;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue(job.isDoneMaterialization());
assertEquals(job.getLastActionNumber(), expectedNominalTimeCount);
assertEquals(job.getNextMaterializedTime(), DateUtils.parseDateOozieTZ("2013-07-18T01:00Z"));
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testActionMaterWithCronFrequency5() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"20/15 * * 5-7 4,5");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
final int expectedNominalTimeCount = 3;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue(job.isDoneMaterialization());
assertEquals(job.getLastActionNumber(), expectedNominalTimeCount);
assertEquals(job.getNextMaterializedTime(), DateUtils.parseDateOozieTZ("2013-07-18T01:20Z"));
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testActionMaterWithCronFrequency6() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"20");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:40Z"),};
final int expectedNominalTimeCount = 3;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue(job.isDoneMaterialization());
assertEquals(job.getLastActionNumber(), expectedNominalTimeCount);
assertEquals(job.getNextMaterializedTime(), DateUtils.parseDateOozieTZ("2013-07-18T01:00Z"));
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testActionMaterWithCronFrequency7() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"20/15 * * 7,10 THU");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
final int expectedNominalTimeCount = 3;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue(job.isDoneMaterialization());
assertEquals(expectedNominalTimeCount, job.getLastActionNumber());
assertEquals(job.getNextMaterializedTime(), DateUtils.parseDateOozieTZ("2013-07-18T01:20Z"));
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testActionMaterwithCronFrequencyWithThrottle() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"0/10 * * * *");
job.setMatThrottling(3);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:20Z")};
final int expectedNominalTimeCount = 3;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertFalse(job.isDoneMaterialization());
assertEquals(expectedNominalTimeCount, job.getLastActionNumber());
assertEquals(DateUtils.parseDateOozieTZ("2013-07-18T00:30Z"), job.getNextMaterializedTime());
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testCronFrequencyCatchupThrottleLessThanDuration() throws Exception {
final String startInThePast = "2013-03-10T08:00Z";
final String startPlusOneDay = "2013-03-11T08:00Z";
final Date startTime = DateUtils.parseDateOozieTZ(startInThePast);
final Date endTime = DateUtils.parseDateOozieTZ(startPlusOneDay);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
job.setNextMaterializedTime(startTime);
job.setMatThrottling(3);
final String everyHour = "0 * * * *";
job.setFrequency(everyHour);
job.setTimeUnit(Timeunit.CRON);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
final String startPlusOneHour = "2013-03-10T09:00Z";
final String startPlusTwoHours = "2013-03-10T10:00Z";
final Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ(startInThePast),
DateUtils.parseDateOozieTZ(startPlusOneHour),
DateUtils.parseDateOozieTZ(startPlusTwoHours)};
final int expectedNominalTimeCount = 3;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertFalse("coordinator job shouldn't have yet been materialized", job.isDoneMaterialization());
assertEquals("coordinator action count mismatch", expectedNominalTimeCount, job.getLastActionNumber());
final String startPlusThreeHours = "2013-03-10T11:00Z";
assertEquals("coordinator next materialization time mismatch",
DateUtils.parseDateOozieTZ(startPlusThreeHours), job.getNextMaterializedTime());
}
catch (final JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testCronFrequencyCatchupThrottleEqualsDurationDSTChange() throws Exception {
final String startInThePast = "2013-03-10T08:00Z";
final Date startTimeBeforeDSTChange = DateUtils.parseDateOozieTZ(startInThePast);
final String startPlusTwoHoursAndSome = "2013-03-10T10:01Z";
final Date endTimeAfterDSTChange = DateUtils.parseDateOozieTZ(startPlusTwoHoursAndSome);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP,
startTimeBeforeDSTChange,
endTimeAfterDSTChange,
false,
false,
0);
job.setNextMaterializedTime(startTimeBeforeDSTChange);
job.setMatThrottling(3);
final String everyHour = "0 * * * *";
job.setFrequency(everyHour);
job.setTimeUnit(Timeunit.CRON);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
final String startPlusOneHour = "2013-03-10T09:00Z";
final String startPlusTwoHours = "2013-03-10T10:00Z";
final Date[] nominalTimesWithDSTChange = new Date[] {DateUtils.parseDateOozieTZ(startInThePast),
DateUtils.parseDateOozieTZ(startPlusOneHour),
DateUtils.parseDateOozieTZ(startPlusTwoHours)
};
final int expectedNominalTimeCount = 3;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimesWithDSTChange);
checkTwoActionsAfterCatchup(job, expectedNominalTimeCount, "2013-03-10T11:00Z");
}
private void checkTwoActionsAfterCatchup(CoordinatorJobBean job, int expectedJobCount, String nextMaterialization)
throws ParseException {
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue("coordinator job should have already been materialized", job.isDoneMaterialization());
assertEquals("coordinator action count mismatch", expectedJobCount, job.getLastActionNumber());
assertEquals("coordinator next materialization time mismatch",
DateUtils.parseDateOozieTZ(nextMaterialization), job.getNextMaterializedTime());
}
catch (final JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testCronFrequencyCatchupThrottleMoreThanDurationNoDSTChange() throws Exception {
final String startInThePast = "2013-03-10T08:00Z";
final Date startTime = DateUtils.parseDateOozieTZ(startInThePast);
final String startPlusOneHourAndSome = "2013-03-10T09:01Z";
final Date endTime = DateUtils.parseDateOozieTZ(startPlusOneHourAndSome);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
job.setNextMaterializedTime(startTime);
job.setMatThrottling(5);
final String everyHour = "0 * * * *";
job.setFrequency(everyHour);
job.setTimeUnit(Timeunit.CRON);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
final String startPlusOneHour = "2013-03-10T09:00Z";
final Date[] nominalTimesWithoutDSTChange = new Date[] {DateUtils.parseDateOozieTZ(startInThePast),
DateUtils.parseDateOozieTZ(startPlusOneHour)};
final int expectedNominalTimeCount = 2;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimesWithoutDSTChange);
checkTwoActionsAfterCatchup(job, expectedNominalTimeCount, "2013-03-10T10:00Z");
}
public void testActionMaterWithDST1() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-03-10T12:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"0 * * * *");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(4)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"),
DateUtils.parseDateOozieTZ("2013-03-10T09:00Z"),
DateUtils.parseDateOozieTZ("2013-03-10T10:00Z"),
DateUtils.parseDateOozieTZ("2013-03-10T11:00Z"),
};
final int expectedNominalTimeCount = 4;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue(job.isDoneMaterialization());
assertEquals(expectedNominalTimeCount, job.getLastActionNumber());
assertEquals(DateUtils.parseDateOozieTZ("2013-03-10T12:00Z"), job.getNextMaterializedTime());
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testActionMaterWithDST2() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2012-11-04T07:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2012-11-04T11:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"0 * * * *");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(4)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2012-11-04T07:00Z"),
DateUtils.parseDateOozieTZ("2012-11-04T08:00Z"),
DateUtils.parseDateOozieTZ("2012-11-04T09:00Z"),
DateUtils.parseDateOozieTZ("2012-11-04T10:00Z"),
};
final int expectedNominalTimeCount = 4;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, nominalTimes);
try {
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertTrue(job.isDoneMaterialization());
assertEquals(job.getLastActionNumber(), expectedNominalTimeCount);
assertEquals(job.getNextMaterializedTime(), DateUtils.parseDateOozieTZ("2012-11-04T11:00Z"));
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + job.getId() + " was not stored properly in db");
}
}
public void testActionMaterWithPauseTime1() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:04Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, pauseTime, "5");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2009-03-06T10:00Z")};
checkCoordActionsNominalTime(job.getId(), 1, nominalTimes);
}
public void testActionMaterWithPauseTime2() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:08Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, pauseTime, "5");
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"),
DateUtils.parseDateOozieTZ("2009-03-06T10:05Z")};
checkCoordActionsNominalTime(job.getId(), 2, nominalTimes);
}
public void testGetDryrun() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
CoordinatorJobBean job = createCoordJob(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
job.setFrequency("5");
job.setTimeUnit(Timeunit.MINUTE);
job.setMatThrottling(20);
String dryRunOutput = new CoordMaterializeTransitionXCommand(job, hoursToSeconds(1), startTime, endTime)
.materializeActions(true);
String[] actions = dryRunOutput.split("action for new instance");
assertEquals(3, actions.length -1);
for(int i = 1; i < actions.length; i++) {
assertTrue(actions[i].contains("action-nominal-time"));
}
}
public void testTimeout() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = null;
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
pauseTime, 300, "5", Timeunit.MINUTE);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordActionsTimeout(job.getId() + "@1", 300);
}
public void testMatLookupCommand1() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
}
public void testMatThrottle() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordWaiting(job.getId(), job.getMatThrottling());
}
/**
* Test a coordinator job that will run in far future,
* materialization should not happen.
*
* @throws Exception
*/
public void testMatLookupCommand2() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2099-02-01T01:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2099-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.PREP);
}
/**
* Test a coordinator job that will run within 5 minutes from now,
* materilization should happen.
*
* @throws Exception
*/
public void testMatLookupCommand3() throws Exception {
Date startTime = DateUtils.toDate(new Timestamp(System.currentTimeMillis() + 180 * 1000));
Date endTime = DateUtils.parseDateOozieTZ("2099-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
}
/**
* Test a coordinator does not materialize actions upon CommandException
* leading to FAILED state
*
* @throws Exception
*/
public void testFailedJobNotMaterializeActions() throws Exception {
String coordXml = "<coordinator-app xmlns=\"uri:oozie:coordinator:0.4\"" + " name=\"NAME\" frequency=\"5\""
+ " start=\"#start\" end=\"#end\" timezone=\"America/Los_Angeles\""
+ " freq_timeunit=\"DAY\" end_of_duration=\"NONE\">" + "<input-events>"
+ "<data-in name=\"a\" dataset=\"a\">"
+ "<dataset name=\"a\" frequency=\"7\" initial-instance=\"2010-01-01T00:00Z\" timezone=\"UTC\" "
+ "freq_timeunit=\"MINUTE\" end_of_duration=\"NONE\">"
+ "<uri-template>${hcatNode}/${db}/${table}/ds=${YEAR}-${MONTH}-${DAY};region=${region}</uri-template>"
+ "</dataset>" + "<start-instance>${coord:current(0)}</start-instance>"
+ "<end-instance>${coord:latest(0)}</end-instance>" + "</data-in>" + "</input-events>" + "<action>"
+ "<workflow>" + "<app-path>hdfs:///tmp/workflows/</app-path>" + "</workflow>" + "</action>"
+ "</coordinator-app>";
CoordinatorJobBean job = addRecordToCoordJobTable(coordXml);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertEquals(CoordinatorJob.Status.FAILED, job.getStatus());
// GetActions for coord job, should be none
int actions = jpaService.execute(new CoordJobGetActionsJPAExecutor(job.getId()));
assertEquals(0, actions);
}
/**
* Test a coordinator job that will run beyond 5 minutes from now,
* materilization should not happen.
*
* @throws Exception
*/
public void testMatLookupCommand4() throws Exception {
Date startTime = DateUtils.toDate(new Timestamp(System.currentTimeMillis() + 360 * 1000));
Date endTime = DateUtils.parseDateOozieTZ("2099-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.PREP);
}
/**
* Test lookup materialization for catchup jobs
*
* @throws Exception
*/
public void testMaterializationLookupDaysFixedDate() throws Exception {
// test with days
Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-05-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false,
0);
job.setNextMaterializedTime(startTime);
job.setMatThrottling(3);
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertEquals(new Date(startTime.getTime() + TIME_IN_DAY * 3), job.getNextMaterializedTime());
}
public void testMaterializationLookupHours() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-05-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
job.setNextMaterializedTime(startTime);
job.setMatThrottling(10);
job.setFrequency("1");
job.setTimeUnitStr("HOUR");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertEquals(new Date(startTime.getTime() + TIME_IN_HOURS * 10), job.getNextMaterializedTime());
}
public void testMaterializationLookupRelativeDays1() throws Exception {
Date currentDate = new Date();
Date startTime = new Date(currentDate.getTime() - TIME_IN_DAY * 3);
Date endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
job.setNextMaterializedTime(startTime);
job.setMatThrottling(10);
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY * 3);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
assertEquals(next, job.getNextMaterializedTime());
}
public void testMaterializationLookupRelativeDays2() throws Exception {
Date startTime = new Date(new Date().getTime());
Date endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
job.setNextMaterializedTime(startTime);
job.setMatThrottling(10);
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
assertEquals(next, job.getNextMaterializedTime());
}
public void testMaterializationLookupMinute() throws Exception {
// for current job in min, should not exceed hour windows
Date startTime = new Date(new Date().getTime());
Date endTime = new Date(startTime.getTime() + TIME_IN_HOURS * 24);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
job.setMatThrottling(20);
job.setFrequency("5");
job.setTimeUnitStr("MINUTE");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_HOURS);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
assertEquals(next, job.getNextMaterializedTime());
}
public void testMaterializationLookupRelativeDays3() throws Exception {
Date startTime = new Date(new Date().getTime());
Date endTime = new Date(startTime.getTime() + TIME_IN_DAY * 24);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
job.setMatThrottling(20);
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
assertEquals(next, job.getNextMaterializedTime());
}
public void testMaterializationLookupDaylightStartStandardMaterialization() throws Exception {
Date startTime = getDaylightCalendar().getTime();
Date endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
job.setNextMaterializedTime(startTime);
job.setMatThrottling(10);
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY * 3);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
assertEquals(next, job.getNextMaterializedTime());
}
public void testMaterializationLookupStandardTimeStartDaylightMaterialization() throws Exception {
Calendar c = getStandardCalendar();
Date startTime = c.getTime();
Date endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3);
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
job.setNextMaterializedTime(startTime);
job.setMatThrottling(10);
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job, hoursToSeconds(1), startTime, endTime).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY * 4);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
assertEquals(next, job.getNextMaterializedTime());
}
private void adjustExpectedMaterializationDateForDSTSwitch(Date date, Date startTime, CoordinatorJobBean job) {
// If the startTime and endTime straddle a DST shift (the Coord is in "America/Los_Angeles"), then we need to adjust for
// that because startTime and endTime assume GMT
TimeZone tz = TimeZone.getTimeZone(job.getTimeZone());
long offset = DaylightOffsetCalculator.getDSTOffset(tz, startTime, date);
Date currentDate = new Date();
Date shiftedDate = new Date(date.getTime() + offset);
if (offset>0 && date.before(currentDate) && shiftedDate.after(currentDate)) {
offset -= TIME_IN_DAY;
}
date.setTime(date.getTime() + offset);
}
private Calendar getDaylightCalendar() {
final Calendar daylight = Calendar.getInstance();
daylight.set(2012, 10, 2, 15, 28, 00);
return daylight;
}
private Calendar getStandardCalendar() {
final Calendar standard = Calendar.getInstance();
standard.set(2013, 2, 9, 15, 28, 00);
return standard;
}
public void testWhenChangingDSTCronAndELMonthlyFrequenciesEqual() throws Exception {
String dstAwareMonthlyCron = "10 23 1 1-12 *";
Date startTime = DateUtils.parseDateOozieTZ("2016-03-01T23:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-12-03T00:00Z");;
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-01T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-04-01T15:10")), // DST started
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-05-01T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-06-01T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-07-01T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-08-01T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-09-01T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-10-01T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-01T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-12-01T15:10")), // DST ended
};
testELAndCronNominalTimesEqual(startTime, endTime, nominalTimesWithTwoDstChange, dstAwareMonthlyCron, "1", Timeunit.MONTH);
}
public void testWhenChangingDSTCronAndELDailyFrequenciesEqual() throws Exception {
String dstAwareDailyCron = "10 23 * * *";
Date startTime = DateUtils.parseDateOozieTZ("2016-03-11T23:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-03-15T02:00Z");
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-11T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-12T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T15:10")), // DST started
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-14T15:10")),
};
testELAndCronNominalTimesEqual(startTime, endTime, nominalTimesWithTwoDstChange, dstAwareDailyCron, "1", Timeunit.DAY);
}
public void testWhenChangingDSTELEveryTwentyFourthHour() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2016-03-11T23:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-03-15T02:00Z");
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-11T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-12T15:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T16:10")), // DST started
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-14T16:10")),
};
testELNominalTimes(startTime, endTime, nominalTimesWithTwoDstChange,"24", Timeunit.HOUR);
}
public void testWhenBeginningDSTCronAndELHourlyFrequenciesEqual() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2017-03-12T07:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2017-03-12T12:30Z");
String everyHourAtTen = "10 * * * *";
Date[] nominalTimesWithOneDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-03-11T23:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-03-12T00:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-03-12T01:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-03-12T03:10")), // DST started
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-03-12T04:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-03-12T05:10")),
};
testELAndCronNominalTimesEqual(startTime, endTime, nominalTimesWithOneDstChange, everyHourAtTen, "1", Timeunit.HOUR);
}
public void testWhenEndingDSTCronAndELHourlyFrequenciesEqual() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2017-11-05T07:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2017-11-05T10:30Z");
String everyHourAtTen = "10 * * * *";
Date[] nominalTimesWithOneDstChange = new Date[]{
DateUtils.parseDateOozieTZ("2017-11-05T07:10Z"), // LA time: 2017-11-05T00:10
DateUtils.parseDateOozieTZ("2017-11-05T08:10Z"), // LA time: 2017-11-05T01:10
DateUtils.parseDateOozieTZ("2017-11-05T09:10Z"), // LA time: 2017-11-05T01:10, DST ended
DateUtils.parseDateOozieTZ("2017-11-05T10:10Z"), // LA time: 2017-11-05T02:10
};
testELAndCronNominalTimesEqual(startTime, endTime, nominalTimesWithOneDstChange, everyHourAtTen, "1", Timeunit.HOUR);
}
public void testWhenChangingDSTELEveryTwentiethDay() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2016-02-01T13:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-12-03T00:00Z");
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-02-01T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-02-21T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-12T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-04-01T05:10")), // DST started
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-04-21T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-05-11T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-05-31T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-06-20T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-07-10T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-07-30T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-08-19T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-09-08T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-09-28T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-10-18T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-07T05:10")), // DST ended
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-27T05:10")),
};
testELNominalTimes(startTime, endTime, nominalTimesWithTwoDstChange,"20", Timeunit.DAY);
}
public void testWhenChangingDSTCronEveryTwentiethDay() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2016-02-01T13:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-12-03T00:00Z");
String everyTwentiethDayAroundDstShift = "10 13 */20 2-3,11,12 *";
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-02-01T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-02-21T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-01T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-21T05:10")), // DST started
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-01T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-21T05:10")), // DST ended
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-12-01T05:10")),
};
testCronNominalTimes(startTime, endTime, nominalTimesWithTwoDstChange, everyTwentiethDayAroundDstShift);
}
public void testWhenChangingDSTCronAndELEveryThirdMonthFrequenciesEqual() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2016-01-01T13:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2017-12-03T00:00Z");
String everyThirdMonth = "10 13 1 */3 *";
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-01-01T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-04-01T05:10")), // DST started on 13th March
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-07-01T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-10-01T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-01-01T05:10")), // DST ended on 6th of November
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-04-01T05:10")), // DST started again on 12th March
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-07-01T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2017-10-01T05:10")),
};
testELAndCronNominalTimesEqual(startTime, endTime, nominalTimesWithTwoDstChange, everyThirdMonth, "3", Timeunit.MONTH);
}
public void testWhenDSTStartsCronFrequencyEveryTwentiethHour() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2016-01-01T13:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-12-03T00:00Z");
String everyTwentiethHourNearDSTShift = "10 */20 12-14 3 *";
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-11T16:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-12T12:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-12T16:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T13:10")), // DST change
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T17:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-14T13:10")),
};
testCronNominalTimes(startTime, endTime, nominalTimesWithTwoDstChange, everyTwentiethHourNearDSTShift);
}
public void testWhenDSTStartsELFrequencyEveryTwentiethHour() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2016-03-12T13:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-03-16T00:00Z");
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-12T05:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T01:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T22:10")), // DST started
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-14T18:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-15T14:10")),
};
testELNominalTimes(startTime, endTime, nominalTimesWithTwoDstChange, "20", Timeunit.HOUR);
}
public void testWhenDSTSEndsCronFrequencyEveryTwentiethHour() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2016-01-01T13:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-12-03T00:00Z");
String everyTwentiethHourNearDSTShift = "10 */20 5-7 11 *";
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-04T16:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-05T13:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-05T17:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-06T12:10")), // DST change
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-06T16:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-07T12:10")),
};
testCronNominalTimes(startTime, endTime, nominalTimesWithTwoDstChange, everyTwentiethHourNearDSTShift);
}
public void testWhenDSTEndsELFrequencyEveryTwentiethHour() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2016-11-04T23:10Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-11-08T22:00Z");
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-04T16:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-05T12:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-06T07:10")), // DST ended
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-07T03:10")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-11-07T23:10")),
};
testELNominalTimes(startTime, endTime, nominalTimesWithTwoDstChange, "20", Timeunit.HOUR);
}
public void testWhenDSTSwitchELAndCronFrequencyEveryThirtiethMinute() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2016-03-13T08:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2016-03-13T13:00Z");
String everyThirtiethMinuteCron = "*/30 * * * *";
Date[] nominalTimesWithTwoDstChange = new Date[]{
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T00:00")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T00:30")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T01:00")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T01:30")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T02:00")), // DST change
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T02:30")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T04:00")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T04:30")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T05:00")),
DateUtils.parseDateOozieTZ(convertLATimeToUTC("2016-03-13T05:30")),
};
testELAndCronNominalTimesEqual(startTime, endTime, nominalTimesWithTwoDstChange,everyThirtiethMinuteCron,
"30", Timeunit.MINUTE);
}
private String convertLATimeToUTC (String localTime) throws Exception {
DateFormat LATimeFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
LATimeFormat.setTimeZone(TimeZone.getTimeZone("America/Los_Angeles"));
Date date = LATimeFormat.parse(localTime);
DateFormat utcFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'");
utcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return utcFormat.format(date);
}
private void testELAndCronNominalTimesEqual (Date startTime, Date endTime, Date[] nominalTimes, String cronFrequency,
String elFrequency, Timeunit elTimeUnit) throws Exception {
CoordinatorJobBean elJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
elFrequency, elTimeUnit);
CoordinatorJobBean cronJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
cronFrequency);
new CoordMaterializeTransitionXCommand(elJob.getId(), TIME_IN_HOURS).call();
new CoordMaterializeTransitionXCommand(cronJob.getId(), TIME_IN_HOURS).call();
elJob = jpaService.execute(new CoordJobGetJPAExecutor(elJob.getId()));
cronJob = jpaService.execute(new CoordJobGetJPAExecutor(cronJob.getId()));
checkCoordActionsNominalTime(cronJob.getId(), nominalTimes.length, nominalTimes);
checkCoordActionsNominalTime(elJob.getId(), nominalTimes.length, nominalTimes);
assertTrue("Cron and EL job materialization should both be complete",
elJob.isDoneMaterialization() && cronJob.isDoneMaterialization());
}
private void testCronNominalTimes (Date startTime, Date endTime, Date[] nominalTimes, String cronFrequency) throws Exception {
CoordinatorJobBean cronJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
cronFrequency);
new CoordMaterializeTransitionXCommand(cronJob.getId(), TIME_IN_HOURS).call();
cronJob = jpaService.execute(new CoordJobGetJPAExecutor(cronJob.getId()));
checkCoordActionsNominalTime(cronJob.getId(), nominalTimes.length, nominalTimes);
assertTrue("Cron job materialization should be complete", cronJob.isDoneMaterialization());
}
private void testELNominalTimes (Date startTime, Date endTime, Date[] nominalTimes, String elFrequency, Timeunit elTimeUnit)
throws Exception {
CoordinatorJobBean elJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
elFrequency, elTimeUnit);
new CoordMaterializeTransitionXCommand(elJob.getId(), TIME_IN_HOURS).call();
elJob = jpaService.execute(new CoordJobGetJPAExecutor(elJob.getId()));
checkCoordActionsNominalTime(elJob.getId(), nominalTimes.length, nominalTimes);
assertTrue("EL job materialization should be complete", elJob.isDoneMaterialization());
}
public void testLastOnlyMaterialization() throws Exception {
long now = System.currentTimeMillis();
Date startTime = DateUtils.toDate(new Timestamp(now - 180 * 60 * 1000)); // 3 hours ago
Date endTime = DateUtils.toDate(new Timestamp(now + 180 * 60 * 1000)); // 3 hours from now
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
CoordinatorJob.Execution.LAST_ONLY);
// This would normally materialize the throttle amount and within a 1 hour window; however, with LAST_ONLY this should
// ignore those parameters and materialize everything in the past
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[19];
Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
checkCoordActionsStatus(job.getId(), expectedStatuses);
startTime = DateUtils.toDate(new Timestamp(now)); // now
job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
CoordinatorJob.Execution.LAST_ONLY);
// We're starting from "now" this time (i.e. present/future), so it should materialize things normally
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
expectedStatuses = new CoordinatorActionBean.Status[6];
Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
checkCoordActionsStatus(job.getId(), expectedStatuses);
}
public void testCurrentTimeCheck() throws Exception {
long now = System.currentTimeMillis();
Date startTime = DateUtils.toDate(new Timestamp(now)); // now
Date endTime = DateUtils.toDate(new Timestamp(now + 3 * 60 * 60 * 1000)); // 3 secondsFromHours from now
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, "5",
20);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
assertEquals(job.getLastActionNumber(), 12);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
// unfortunatily XCommand doesn't throw exception on precondition
// assertEquals(e.getErrorCode(), ErrorCode.E1100);
// assertTrue(e.getMessage().contains("Request is for future time. Lookup time is"));
job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
// getLastActionNumber should 12, last CoordMaterializeTransitionXCommand have failed
assertEquals(job.getLastActionNumber(), 12);
}
public void testMaterizationEndOfMonths() throws Exception {
Configuration conf = new XConfiguration();
File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
String appXml = "<coordinator-app name=\"test\" frequency=\"${coord:endOfMonths(1)}\" start=\"2009-02-01T01:00Z\" "
+ "end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
+ "xmlns=\"uri:oozie:coordinator:0.2\"> <controls> "
+ "<execution>LIFO</execution> </controls> <datasets> "
+ "<dataset name=\"a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>"
+ getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ "</uri-template> "
+ "</dataset> "
+ "<dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>"
+ getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ "</uri-template> "
+ " </dataset> "
+ "</datasets> <input-events> "
+ "<data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in> "
+ "</input-events> "
+ "<output-events> <data-out name=\"LOCAL_A\" dataset=\"local_a\"> "
+ "<instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> "
+ "<app-path>hdfs:///tmp/workflows/</app-path> "
+ "<configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> "
+ "<property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> "
+ "</property></configuration> </workflow> </action> </coordinator-app>";
writeToFile(appXml, appPathFile);
conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
conf.set(OozieClient.USER_NAME, getTestUser());
CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);
String jobId = sc.call();
Calendar cal = Calendar.getInstance(DateUtils.getOozieProcessingTimeZone());
cal.add(Calendar.MONTH, -3);
Date startTime = cal.getTime();
cal = Calendar.getInstance(DateUtils.getOozieProcessingTimeZone());
cal.add(Calendar.MONTH, 3);
Date endTime = cal.getTime();
CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
assertEquals(job.getLastActionNumber(), 0);
job.setStartTime(startTime);
job.setEndTime(endTime);
job.setMatThrottling(10);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
assertEquals(job.getLastActionNumber(), 3);
String jobXml = job.getJobXml();
Element eJob = XmlUtils.parseXml(jobXml);
TimeZone appTz = DateUtils.getTimeZone(job.getTimeZone());
TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
TimeUnit freqTU = TimeUnit.valueOf(job.getTimeUnitStr());
Calendar origStart = Calendar.getInstance(appTz);
origStart.setTime(job.getStartTimestamp());
// Move to the End of duration, if needed.
DateUtils.moveToEnd(origStart, endOfFlag);
origStart.add(freqTU.getCalendarUnit(), 3 * Integer.parseInt(job.getFrequency()));
assertEquals(job.getNextMaterializedTime(), origStart.getTime());
}
public void testActionMaterEndOfWeeks() throws Exception {
Configuration conf = new XConfiguration();
File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
String appXml = "<coordinator-app name=\"test\" frequency=\"${coord:endOfWeeks(1)}\" start=\"2016-02-03T01:00Z\" "
+ "end=\"2016-03-03T23:59Z\" timezone=\"UTC\" " + "xmlns=\"uri:oozie:coordinator:0.2\"> <controls> "
+ "<execution>LIFO</execution> </controls> <datasets> "
+ "<dataset name=\"a\" frequency=\"${coord:endOfWeeks(1)}\" initial-instance=\"2016-01-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ "</uri-template> " + "</dataset> "
+ "<dataset name=\"local_a\" frequency=\"${coord:endOfWeeks(1)}\" initial-instance=\"2016-01-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ "</uri-template> " + " </dataset> " + "</datasets> <input-events> "
+ "<data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in> "
+ "</input-events> " + "<output-events> <data-out name=\"LOCAL_A\" dataset=\"local_a\"> "
+ "<instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> "
+ "<app-path>hdfs:///tmp/workflows/</app-path> "
+ "<configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> "
+ "<property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> "
+ "</property></configuration> </workflow> </action> </coordinator-app>";
writeToFile(appXml, appPathFile);
conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
conf.set(OozieClient.USER_NAME, getTestUser());
CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);
String jobId = sc.call();
CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
assertEquals(job.getLastActionNumber(), 0);
job.setMatThrottling(10);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
assertEquals(4, job.getLastActionNumber());
String jobXml = job.getJobXml();
Element eJob = XmlUtils.parseXml(jobXml);
TimeZone appTz = DateUtils.getTimeZone(job.getTimeZone());
TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
TimeUnit freqTU = TimeUnit.valueOf(job.getTimeUnitStr());
Calendar origStart = Calendar.getInstance(appTz);
origStart.setTime(job.getStartTimestamp());
// Move to the End of duration, if needed.
DateUtils.moveToEnd(origStart, endOfFlag);
origStart.add(freqTU.getCalendarUnit(), 4 * Integer.parseInt(job.getFrequency()));
assertEquals(origStart.getTime(), job.getNextMaterializedTime());
}
private void checkCoordJobs(String jobId, CoordinatorJob.Status expectedStatus) {
try {
CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
if (job.getStatus() != expectedStatus) {
fail("CoordJobMatLookupCommand didn't work because the status for job id"
+ jobId + " is : " + job.getStatusStr() + "; however expected status is : " + expectedStatus.toString());
}
}
catch (JPAExecutorException se) {
fail("Job ID " + jobId + " was not stored properly in db");
}
}
private void checkCoordWaiting(String jobId, int expectedValue) {
try {
int numWaitingActions = jpaService.execute(new CoordJobGetRunningActionsCountJPAExecutor(jobId));
assert (numWaitingActions <= expectedValue);
}
catch (JPAExecutorException se) {
fail("Job ID " + jobId + " was not stored properly in db");
}
}
private CoordinatorActionBean checkCoordAction(String actionId) throws JPAExecutorException {
long lastSeqId[] = new long[1];
List<SLAEventBean> slaEventList = jpaService.execute(new SLAEventsGetForSeqIdJPAExecutor(-1, 10, lastSeqId));
if (slaEventList.size() == 0) {
fail("Unable to GET any record of sequence id greater than 0");
}
CoordinatorActionBean actionBean;
actionBean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
return actionBean;
}
private CoordinatorActionBean getCoordAction(String actionId) throws JPAExecutorException {
CoordinatorActionBean actionBean;
jpaService = Services.get().get(JPAService.class);
actionBean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
return actionBean;
}
private void checkCoordActionsNominalTime(String jobId, int number, Date[] nominalTimes) {
try {
List<CoordinatorActionBean> actions = jpaService.execute(new CoordJobGetActionsSubsetJPAExecutor(jobId,
null, 1, 1000, false));
if (actions.size() != number) {
fail("Should have " + number + " actions created for job " + jobId + ", but has " + actions.size() + " actions.");
}
for (int i=0; i < nominalTimes.length; i++ ) {
assertEquals(nominalTimes[i], actions.get(i).getNominalTime());
}
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + jobId + " was not stored properly in db");
}
}
private void checkCoordActionsStatus(String jobId, CoordinatorActionBean.Status[] statuses) {
try {
List<CoordinatorActionBean> actions = jpaService.execute(new CoordJobGetActionsSubsetJPAExecutor(jobId,
null, 1, 1000, false));
if (actions.size() != statuses.length) {
fail("Should have " + statuses.length + " actions created for job " + jobId + ", but has " + actions.size()
+ " actions.");
}
for (int i=0; i < statuses.length; i++ ) {
assertEquals(statuses[i], actions.get(i).getStatus());
}
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Job ID " + jobId + " was not stored properly in db");
}
}
private void checkCoordActionsTimeout(String actionId, int expected) {
try {
CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
assertEquals(action.getTimeOut(), expected);
}
catch (JPAExecutorException se) {
se.printStackTrace();
fail("Action ID " + actionId + " was not stored properly in db");
}
}
/**
* Test a coordinator SLA define EL functions as variable
*
* @throws Exception
*/
public void testSuccessedJobSlaParseElFunctionVariableInMaterializeActions() throws Exception {
Configuration conf = new XConfiguration();
File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
String coordXml = "<coordinator-app name=\"NAME\" frequency=\"0 * * * *\""
+ " start=\"2017-06-12T01:00Z\" end=\"2017-06-12T02:00Z\" timezone=\"Asia/Shanghai\""
+ " xmlns=\"uri:oozie:coordinator:0.4\" xmlns:sla=\"uri:oozie:sla:0.2\">"
+ "<controls> <execution>FIFO</execution> </controls>"
+ "<action>"
+ " <workflow> <app-path>hdfs:///tmp/workflows/</app-path> </workflow> "
+ " <sla:info>"
+ " <sla:nominal-time>${NOMINAL_TIME}</sla:nominal-time>"
+ " <sla:should-start>${SHOULD_START}</sla:should-start>"
+ " <sla:should-end>${SHOULD_END}</sla:should-end>"
+ " </sla:info>"
+ "</action>"
+ "</coordinator-app>";
writeToFile(coordXml, appPathFile);
conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set("NOMINAL_TIME", "${coord:nominalTime()}");
conf.set("SHOULD_START", "${5 * MINUTES}");
conf.set("SHOULD_END", "${ SLA_OFFSET * HOURS}");
conf.set("SLA_OFFSET", "10");
CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);
String jobId = sc.call();
new CoordMaterializeTransitionXCommand(jobId, 60).call();
}
}