OOZIE-2064 coord job with frequency coord:endOfMonths doesn't materialize
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 6c9d4ae..623c2d3 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -253,7 +253,7 @@
endMatInstance = (Calendar) startInstance.clone();
endMatInstance.add(freqTU.getCalendarUnit(), i * frequency);
if (endMatInstance.getTime().compareTo(new Date()) >= 0) {
- if (previousInstance.after(currentMatTime)) {
+ if (previousInstance.getTime().after(currentMatTime)) {
return previousInstance.getTime();
}
else {
@@ -424,41 +424,45 @@
boolean isCronFrequency = false;
+ Calendar effStart = (Calendar) start.clone();
try {
- Integer.parseInt(coordJob.getFrequency());
- } catch (NumberFormatException e) {
+ int intFrequency = Integer.parseInt(coordJob.getFrequency());
+ effStart = (Calendar) origStart.clone();
+ effStart.add(freqTU.getCalendarUnit(), lastActionNumber * intFrequency);
+ }
+ catch (NumberFormatException e) {
isCronFrequency = true;
}
boolean firstMater = true;
- while (start.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) {
- if (pause != null && start.compareTo(pause) >= 0) {
+ while (effStart.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) {
+ if (pause != null && effStart.compareTo(pause) >= 0) {
break;
}
- Date nextTime = start.getTime();
+ Date nextTime = effStart.getTime();
if (isCronFrequency) {
- if (start.getTime().compareTo(startMatdTime) == 0 && firstMater) {
- start.add(Calendar.MINUTE, -1);
+ if (effStart.getTime().compareTo(startMatdTime) == 0 && firstMater) {
+ effStart.add(Calendar.MINUTE, -1);
firstMater = false;
}
- nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(start.getTime(), coordJob);
- start.setTime(nextTime);
+ nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob);
+ effStart.setTime(nextTime);
}
- if (start.compareTo(end) < 0) {
+ if (effStart.compareTo(end) < 0) {
- if (pause != null && start.compareTo(pause) >= 0) {
+ if (pause != null && effStart.compareTo(pause) >= 0) {
break;
}
CoordinatorActionBean actionBean = new CoordinatorActionBean();
lastActionNumber++;
int timeout = coordJob.getTimeout();
- LOG.debug("Materializing action for time=" + DateUtils.formatDateOozieTZ(start.getTime()) + ", lastactionnumber=" + lastActionNumber
- + " timeout=" + timeout + " minutes");
+ LOG.debug("Materializing action for time=" + DateUtils.formatDateOozieTZ(effStart.getTime())
+ + ", lastactionnumber=" + lastActionNumber + " timeout=" + timeout + " minutes");
Date actualTime = new Date();
action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
nextTime, actualTime, lastActionNumber, jobConf, actionBean);
@@ -478,22 +482,22 @@
}
if (!isCronFrequency) {
- start = (Calendar) origStart.clone();
- start.add(freqTU.getCalendarUnit(), lastActionNumber * Integer.parseInt(coordJob.getFrequency()));
+ effStart = (Calendar) origStart.clone();
+ effStart.add(freqTU.getCalendarUnit(), lastActionNumber * Integer.parseInt(coordJob.getFrequency()));
}
}
if (isCronFrequency) {
- if (start.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) {
+ if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) {
//Since we exceed the throttle, we need to move the nextMadtime forward
//to avoid creating duplicate actions
if (!firstMater) {
- start.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(start.getTime(), coordJob));
+ effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob));
}
}
}
- endMatdTime = start.getTime();
+ endMatdTime = effStart.getTime();
if (!dryrun) {
return action;
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index 4a852cf..4286bcb 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -17,19 +17,25 @@
*/
package org.apache.oozie.command.coord;
+import java.io.File;
import java.sql.Timestamp;
import java.util.Arrays;
+import java.util.Calendar;
import java.util.Date;
import java.util.List;
+import java.util.TimeZone;
+import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.SLAEventBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.TimeUnit;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
@@ -46,6 +52,9 @@
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
@SuppressWarnings("deprecation")
public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
@@ -550,7 +559,7 @@
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
- assertEquals(new Date(startTime.getTime() + TIME_IN_DAY ), job.getNextMaterializedTime());
+ assertEquals(new Date(startTime.getTime() + TIME_IN_DAY * 3), job.getNextMaterializedTime());
// test with hours, time should not pass the current time.
startTime = new Date(new Date().getTime());
@@ -627,6 +636,64 @@
Date pauseTime, int timeout, String freq) throws Exception {
return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, timeout, freq, CoordinatorJob.Execution.FIFO);
}
+ public void testMaterizationEndOfMonths() throws Exception {
+ Configuration conf = new XConfiguration();
+ File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
+ String appXml = "<coordinator-app name=\"test\" frequency=\"${coord:endOfMonths(1)}\" start=\"2009-02-01T01:00Z\" "
+ + "end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
+ + "xmlns=\"uri:oozie:coordinator:0.2\"> <controls> "
+ + "<execution>LIFO</execution> </controls> <datasets> "
+ + "<dataset name=\"a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ + "timezone=\"UTC\"> <uri-template>"
+ + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ + "</uri-template> "
+ + "</dataset> "
+ + "<dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ + "timezone=\"UTC\"> <uri-template>"
+ + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ + "</uri-template> "
+ + " </dataset> "
+ + "</datasets> <input-events> "
+ + "<data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in> "
+ + "</input-events> "
+ + "<output-events> <data-out name=\"LOCAL_A\" dataset=\"local_a\"> "
+ + "<instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> "
+ + "<app-path>hdfs:///tmp/workflows/</app-path> "
+ + "<configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> "
+ + "<property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> "
+ + "</property></configuration> </workflow> </action> </coordinator-app>";
+ writeToFile(appXml, appPathFile);
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);
+ String jobId = sc.call();
+
+ Date currentTime = new Date();
+ Date startTime = org.apache.commons.lang.time.DateUtils.addMonths(currentTime, -3);
+ Date endTime = org.apache.commons.lang.time.DateUtils.addMonths(currentTime, 3);
+ CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
+ assertEquals(job.getLastActionNumber(), 0);
+
+ job.setStartTime(startTime);
+ job.setEndTime(endTime);
+ job.setMatThrottling(10);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
+ assertEquals(job.getLastActionNumber(), 3);
+
+ String jobXml = job.getJobXml();
+ Element eJob = XmlUtils.parseXml(jobXml);
+ TimeZone appTz = DateUtils.getTimeZone(job.getTimeZone());
+ TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
+ TimeUnit freqTU = TimeUnit.valueOf(job.getTimeUnitStr());
+ Calendar origStart = Calendar.getInstance(appTz);
+ origStart.setTime(job.getStartTimestamp());
+ // Move to the End of duration, if needed.
+ DateUtils.moveToEnd(origStart, endOfFlag);
+ origStart.add(freqTU.getCalendarUnit(), 3 * Integer.parseInt(job.getFrequency()));
+ assertEquals(job.getNextMaterializedTime(), origStart.getTime());
+ }
protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime,
Date pauseTime, int timeout, String freq, CoordinatorJob.Execution execution) throws Exception {
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
index fedf4a8..82c9e99 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
@@ -19,8 +19,6 @@
import java.io.File;
import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
@@ -1219,22 +1217,6 @@
return null;
}
- private void writeToFile(String appXml, File appPathFile) throws Exception {
- PrintWriter out = null;
- try {
- out = new PrintWriter(new FileWriter(appPathFile));
- out.println(appXml);
- }
- catch (IOException iex) {
- throw iex;
- }
- finally {
- if (out != null) {
- out.close();
- }
- }
- }
-
/**
* Test timeout setting
*
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index 7614f03..b72c641 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -18,9 +18,12 @@
package org.apache.oozie.test;
import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
@@ -1638,4 +1641,20 @@
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord);
}
+ protected void writeToFile(String appXml, File appPathFile) throws Exception {
+ PrintWriter out = null;
+ try {
+ out = new PrintWriter(new FileWriter(appPathFile));
+ out.println(appXml);
+ }
+ catch (IOException iex) {
+ throw iex;
+ }
+ finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
}
diff --git a/release-log.txt b/release-log.txt
index 65d454c..a50a3f5 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (4.1 - unreleased)
+OOZIE-2064 coord job with frequency coord:endOfMonths doesn't materialize (puru)
OOZIE-2063 Cron syntax creates duplicate actions (bzhang)
OOZIE-2032 If using SSL, the port reported by Oozie is incorrect for HA tasks (rkanter)
OOZIE-1959 TestZKUtilsWithSecurity fails (rkanter)