blob: 9de76dcfa304d4f68e1e360dbfaa6374d5fb403c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.coord;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.dependency.URIHandler.Context;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import org.jdom.JDOMException;
import java.net.URI;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Objects;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class implements the EL function related to coordinator
*/
public class CoordELFunctions {
private static final XLog LOG = XLog.getLog(CoordELFunctions.class);
final public static String DATASET = "oozie.coord.el.dataset.bean";
final public static String COORD_ACTION = "oozie.coord.el.app.bean";
final public static String CONFIGURATION = "oozie.coord.el.conf";
final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time";
// INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
final public static String INSTANCE_SEPARATOR = "#";
final public static String DIR_SEPARATOR = ",";
// TODO: in next release, support flexibility
private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
public static final long MINUTE_MSEC = 60 * 1000L;
public static final long HOUR_MSEC = 60 * MINUTE_MSEC;
public static final long DAY_MSEC = 24 * HOUR_MSEC;
public static final long WEEK_MSEC = 7 * DAY_MSEC;
/**
* Used in defining the frequency in 'day' unit. <p> domain: <code> val &gt; 0</code> and should be integer.
*
* @param val frequency in number of days.
* @return number of days and also set the frequency timeunit to "day"
*/
public static int ph1_coord_days(int val) {
val = ParamChecker.checkGTZero(val, "n");
ELEvaluator eval = ELEvaluator.getCurrent();
eval.setVariable("timeunit", TimeUnit.DAY);
eval.setVariable("endOfDuration", TimeUnit.NONE);
return val;
}
/**
* Used in defining the frequency in 'month' unit. <p> domain: <code> val &gt; 0</code> and should be integer.
*
* @param val frequency in number of months.
* @return number of months and also set the frequency timeunit to "month"
*/
public static int ph1_coord_months(int val) {
val = ParamChecker.checkGTZero(val, "n");
ELEvaluator eval = ELEvaluator.getCurrent();
eval.setVariable("timeunit", TimeUnit.MONTH);
eval.setVariable("endOfDuration", TimeUnit.NONE);
return val;
}
/**
* Used in defining the frequency in 'hour' unit. <p> parameter value domain: <code> val &gt; 0</code> and should
* be integer.
*
* @param val frequency in number of hours.
* @return number of minutes and also set the frequency timeunit to "minute"
*/
public static int ph1_coord_hours(int val) {
val = ParamChecker.checkGTZero(val, "n");
ELEvaluator eval = ELEvaluator.getCurrent();
eval.setVariable("timeunit", TimeUnit.MINUTE);
eval.setVariable("endOfDuration", TimeUnit.NONE);
return val * 60;
}
/**
* Used in defining the frequency in 'minute' unit. <p> domain: <code> val &gt; 0</code> and should be integer.
*
* @param val frequency in number of minutes.
* @return number of minutes and also set the frequency timeunit to "minute"
*/
public static int ph1_coord_minutes(int val) {
val = ParamChecker.checkGTZero(val, "n");
ELEvaluator eval = ELEvaluator.getCurrent();
eval.setVariable("timeunit", TimeUnit.MINUTE);
eval.setVariable("endOfDuration", TimeUnit.NONE);
return val;
}
/**
* Used in defining the frequency in 'day' unit and specify the "end of day" property. <p> Every instance will
* start at 00:00 hour of each day. <p> domain: <code> val &gt; 0</code> and should be integer.
*
* @param val frequency in number of days.
* @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
*/
public static int ph1_coord_endOfDays(int val) {
val = ParamChecker.checkGTZero(val, "n");
ELEvaluator eval = ELEvaluator.getCurrent();
eval.setVariable("timeunit", TimeUnit.DAY);
eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
return val;
}
/**
* Used in defining the frequency in 'week' unit and specify the "end of
* week" property.
* <p>
* Every instance will start at 00:00 hour of start of week
* <p>
* domain: <code> val &gt; 0</code> and should be integer.
*
* @param val frequency in number of weeks.
* @return number of weeks and also set the frequency timeunit to week of
* the year and end_of_duration flag to week of the year
*/
public static int ph1_coord_endOfWeeks(int val) {
val = ParamChecker.checkGTZero(val, "n");
ELEvaluator eval = ELEvaluator.getCurrent();
eval.setVariable("timeunit", TimeUnit.WEEK);
eval.setVariable("endOfDuration", TimeUnit.END_OF_WEEK);
return val;
}
/**
* Used in defining the frequency in 'month' unit and specify the "end of month" property. <p> Every instance will
* start at first day of each month at 00:00 hour. <p> domain: <code> val &gt; 0</code> and should be integer.
*
* @param val frequency in number of months.
* @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
*/
public static int ph1_coord_endOfMonths(int val) {
val = ParamChecker.checkGTZero(val, "n");
ELEvaluator eval = ELEvaluator.getCurrent();
eval.setVariable("timeunit", TimeUnit.MONTH);
eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
return val;
}
/**
* Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p> Depends on: <p>
* 1. Timezone of both dataset and job <p> 2. Action creation Time
*
* @return difference in minutes (DataSet TZ Offset - Application TZ offset)
*/
public static int ph2_coord_tzOffset() {
long actionCreationTime = getActionCreationtime().getTime();
TimeZone dsTZ = Objects.requireNonNull(getDatasetTZ(), "DatasetTZ cannot be null");
TimeZone jobTZ = Objects.requireNonNull(getJobTZ(), "JobTZ cannot be null");
return (dsTZ.getOffset(actionCreationTime) - jobTZ.getOffset(actionCreationTime)) / (1000 * 60);
}
public static int ph3_coord_tzOffset() {
return ph2_coord_tzOffset();
}
/**
* Returns a date string that is offset from 'strBaseDate' by the amount specified. The unit can be one of
* DAY, MONTH, HOUR, MINUTE, MONTH.
*
* @param strBaseDate The base date
* @param offset any number
* @param unit one of DAY, MONTH, HOUR, MINUTE, MONTH
* @return the offset date string
* @throws Exception when getting calendar based on strBaseDate fails
*/
public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
StringBuilder buffer = new StringBuilder();
baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
buffer.append(DateUtils.formatDateOozieTZ(baseCalDate));
return buffer.toString();
}
public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
return ph2_coord_dateOffset(strBaseDate, offset, unit);
}
/**
* Returns a date string that is offset from 'strBaseDate' by the difference from Oozie processing timezone to the given
* timezone. It will account for daylight saving time based on the given 'strBaseDate' and 'timezone'.
*
* @param strBaseDate The base date
* @param timezone the timezone
* @return the offset date string
* @throws Exception when getting calendar based on strBaseDate fails
*/
public static String ph2_coord_dateTzOffset(String strBaseDate, String timezone) throws Exception {
Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
StringBuilder buffer = new StringBuilder();
baseCalDate.setTimeZone(DateUtils.getTimeZone(timezone));
buffer.append(DateUtils.formatDate(baseCalDate));
return buffer.toString();
}
public static String ph3_coord_dateTzOffset(String strBaseDate, String timezone) throws Exception{
return ph2_coord_dateTzOffset(strBaseDate, timezone);
}
/**
* Determine the date-time in Oozie processing timezone of n-th future available dataset instance
* from nominal Time but not beyond the instance specified as 'instance.
* <p>
* It depends on:
* <p>
* 1. Data set frequency
* <p>
* 2. Data set Time unit (day, month, minute)
* <p>
* 3. Data set Time zone/DST
* <p>
* 4. End Day/Month flag
* <p>
* 5. Data set initial instance
* <p>
* 6. Action Creation Time
* <p>
* 7. Existence of dataset's directory
*
* @param n :instance count
* <p>
* domain: n &gt;= 0, n is integer
* @param instance How many future instance it should check? value should
* be &gt;=0
* @return date-time in Oozie processing timezone of the n-th instance
* <p>
* @throws Exception if the dataset is asynchronous
*/
public static String ph3_coord_future(int n, int instance) throws Exception {
ParamChecker.checkGEZero(n, "future:n");
ParamChecker.checkGTZero(instance, "future:instance");
if (isSyncDataSet()) {// For Sync Dataset
return coord_future_sync(n, instance);
}
else {
throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
}
}
/**
* Determine the date-time in Oozie processing timezone of the future available dataset instances
* from start to end offsets from nominal Time but not beyond the instance specified as 'instance'.
* <p>
* It depends on:
* <p>
* 1. Data set frequency
* <p>
* 2. Data set Time unit (day, month, minute)
* <p>
* 3. Data set Time zone/DST
* <p>
* 4. End Day/Month flag
* <p>
* 5. Data set initial instance
* <p>
* 6. Action Creation Time
* <p>
* 7. Existence of dataset's directory
*
* @param start : start instance offset
* <p>
* domain: start &gt;= 0, start is integer
* @param end : end instance offset
* <p>
* domain: end &gt;= 0, end is integer
* @param instance How many future instance it should check? value should
* be &gt;=0
* @return date-time in Oozie processing timezone of the instances from start to end offsets
* delimited by comma.
* @throws Exception if the dataset is asynchronous
*/
public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception {
ParamChecker.checkGEZero(start, "future:n");
ParamChecker.checkGEZero(end, "future:n");
ParamChecker.checkGTZero(instance, "future:instance");
if (isSyncDataSet()) {// For Sync Dataset
return coord_futureRange_sync(start, end, instance);
}
else {
throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
}
}
private static String coord_future_sync(int n, int instance) throws Exception {
return coord_futureRange_sync(n, n, instance);
}
private static String coord_futureRange_sync(final int startOffset, final int endOffset, final int instance) throws Exception {
return new FutureEvaluator(startOffset, endOffset, instance).evaluate();
}
/**
* Return nominal time or Action Creation Time.
*
* @return coordinator action creation or materialization date time
* @throws Exception if unable to format the Date object to String
*/
public static String ph2_coord_nominalTime() throws Exception {
ELEvaluator eval = ELEvaluator.getCurrent();
SyncCoordAction action = Objects.requireNonNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
"Coordinator Action cannot be null");
return DateUtils.formatDateOozieTZ(action.getNominalTime());
}
public static String ph3_coord_nominalTime() throws Exception {
return ph2_coord_nominalTime();
}
/**
* Convert from standard date-time formatting to a desired format.
* @param dateTimeStr - A timestamp in standard (ISO8601) format.
* @param format - A string representing the desired format.
* @return coordinator action creation or materialization date time
* @throws Exception if unable to format the Date object to String
*/
public static String ph2_coord_formatTime(String dateTimeStr, String format)
throws Exception {
Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
return DateUtils.formatDateCustom(dateTime, format);
}
public static String ph3_coord_formatTime(String dateTimeStr, String format)
throws Exception {
return ph2_coord_formatTime(dateTimeStr, format);
}
/**
* Convert from standard date-time formatting to a Unix epoch time.
* @param dateTimeStr - A timestamp in standard (ISO8601) format.
* @param millis - "true" to include millis; otherwise will only include seconds
* @return coordinator action creation or materialization date time
* @throws Exception if unable to format the Date object to String
*/
public static String ph2_coord_epochTime(String dateTimeStr, String millis)
throws Exception {
Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
return DateUtils.formatDateEpoch(dateTime, Boolean.valueOf(millis));
}
public static String ph3_coord_epochTime(String dateTimeStr, String millis)
throws Exception {
return ph2_coord_epochTime(dateTimeStr, millis);
}
/**
* Return Action Id.
*
* @return coordinator action Id
* @throws Exception if parameter checking fails
*/
public static String ph2_coord_actionId() throws Exception {
ELEvaluator eval = ELEvaluator.getCurrent();
SyncCoordAction action = Objects.requireNonNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
"Coordinator Action cannot be null");
return action.getActionId();
}
public static String ph3_coord_actionId() throws Exception {
return ph2_coord_actionId();
}
/**
* Return Job Name. <p>
*
* @return coordinator name
* @throws Exception if parameter checking fails
*/
public static String ph2_coord_name() throws Exception {
ELEvaluator eval = ELEvaluator.getCurrent();
SyncCoordAction action = Objects.requireNonNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
"Coordinator Action cannot be null");
return action.getName();
}
public static String ph3_coord_name() throws Exception {
return ph2_coord_name();
}
/**
* Return Action Start time. <p>
*
* @return coordinator action start time
* @throws Exception if unable to format the Date object to String
*/
public static String ph2_coord_actualTime() throws Exception {
ELEvaluator eval = ELEvaluator.getCurrent();
SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
if (coordAction == null) {
throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
}
return DateUtils.formatDateOozieTZ(coordAction.getActualTime());
}
public static String ph3_coord_actualTime() throws Exception {
return ph2_coord_actualTime();
}
/**
* Used to specify a list of URI's that are used as input dir to the workflow job. <p> Look for two evaluator-level
* variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the current list of
* URI. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
* unresolved, this function will echo back the original function <p> otherwise it sends the uris.
*
* @param dataInName : Datain name
* @return the list of URI's separated by INSTANCE_SEPARATOR <p> if there are unresolved EL function (i.e. latest)
* , echo back <p> the function without resolving the function.
*/
public static String ph3_coord_dataIn(String dataInName) {
String uris = "";
ELEvaluator eval = ELEvaluator.getCurrent();
if (eval.getVariable(".datain." + dataInName) == null
&& (eval.getVariable(".actionInputLogic") != null && !StringUtils.isEmpty(eval.getVariable(
".actionInputLogic").toString()))) {
try {
return new CoordInputLogicEvaluatorUtil().getInputDependencies(dataInName,
(SyncCoordAction) eval.getVariable(COORD_ACTION));
}
catch (JDOMException e) {
XLog.getLog(CoordELFunctions.class).error(e);
throw new RuntimeException(e.getMessage());
}
}
uris = (String) eval.getVariable(".datain." + dataInName);
Object unResolvedObj = eval.getVariable(".datain." + dataInName + ".unresolved");
if (unResolvedObj == null) {
return uris;
}
Boolean unresolved = Boolean.parseBoolean(unResolvedObj.toString());
if (unresolved != null && unresolved.booleanValue() == true) {
return "${coord:dataIn('" + dataInName + "')}";
}
return uris;
}
/**
* Used to specify a list of URI's that are output dir of the workflow job. <p> Look for one evaluator-level
* variable <p> dataout.&lt;DATAOUT_NAME&gt; <p> It defines the current list of URI. <p> otherwise it sends the uris.
*
* @param dataOutName : Dataout name
* @return the list of URI's separated by INSTANCE_SEPARATOR
*/
public static String ph3_coord_dataOut(String dataOutName) {
String uris = "";
ELEvaluator eval = ELEvaluator.getCurrent();
uris = (String) eval.getVariable(".dataout." + dataOutName);
return uris;
}
/**
* Determine the date-time in Oozie processing timezone of n-th dataset instance. <p> It depends on: <p> 1.
* Data set frequency <p> 2.
* Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST <p> 4. End Day/Month flag <p> 5. Data
* set initial instance <p> 6. Action Creation Time
*
* @param n instance count domain: n is integer
* @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is
* earlier than Initial-Instance of DS
*/
public static String ph2_coord_current(int n) {
if (isSyncDataSet()) { // For Sync Dataset
return coord_current_sync(n);
}
else {
throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
}
}
/**
* Determine the date-time in Oozie processing timezone of current dataset instances
* from start to end offsets from the nominal time. <p> It depends
* on: <p> 1. Data set frequency <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST
* <p> 4. End Day/Month flag <p> 5. Data set initial instance <p> 6. Action Creation Time
*
* @param start :start instance offset <p> domain: start &lt;= 0, start is integer
* @param end :end instance offset <p> domain: end &lt;= 0, end is integer
* @return date-time in Oozie processing timezone of the instances from start to end offsets
* delimited by comma. <p> If the current instance time of the dataset based on the Action Creation Time
* is earlier than the Initial-Instance of DS an empty string is returned.
* If an instance within the range is earlier than Initial-Instance of DS that instance is ignored
*/
public static String ph2_coord_currentRange(int start, int end) {
if (isSyncDataSet()) { // For Sync Dataset
return coord_currentRange_sync(start, end);
}
else {
throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
}
}
/**
* Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p> It
* depends on: <p> 1. Data set frequency <p> 2. Data set Time Unit <p> 3. Data set Time zone/DST
* <p> 4. Data set initial instance <p> 5. Action Creation Time
*
* @param n offset amount (integer)
* @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
* @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time
* @throws Exception if there was a problem formatting
*/
public static String ph2_coord_offset(int n, String timeUnit) throws Exception {
if (isSyncDataSet()) { // For Sync Dataset
return coord_offset_sync(n, timeUnit);
}
else {
throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
}
}
/**
* Determine how many hours is on the date of n-th dataset instance. <p> It depends on: <p> 1. Data set frequency
* <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST <p> 4. End Day/Month flag <p> 5.
* Data set initial instance <p> 6. Action Creation Time
*
* @param n instance count <p> domain: n is integer
* @return number of hours on that day <p> returns -1 means n-th instance is earlier than Initial-Instance of DS
*/
public static int ph2_coord_hoursInDay(int n) {
int datasetFrequency = (int) getDSFrequency();
// /Calendar nominalInstanceCal =
// getCurrentInstance(getActionCreationtime());
Calendar nominalInstanceCal = getEffectiveNominalTime();
if (nominalInstanceCal == null) {
return -1;
}
nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
/*
* if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
* { return -1; }
*/
nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
// DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
return DateUtils.hoursInDay(nominalInstanceCal);
}
public static int ph3_coord_hoursInDay(int n) throws Exception {
return ph2_coord_hoursInDay(n);
}
/**
* Calculate number of days in one month for n-th dataset instance. <p> It depends on: <p> 1. Data set frequency .
* <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST <p> 4. End Day/Month flag <p> 5.
* Data set initial instance <p> 6. Action Creation Time
*
* @param n instance count. domain: n is integer
* @return number of days in that month <p> returns -1 means n-th instance is earlier than Initial-Instance of DS
*/
public static int ph2_coord_daysInMonth(int n) {
int datasetFrequency = (int) getDSFrequency();// in minutes
// Calendar nominalInstanceCal =
// getCurrentInstance(getActionCreationtime());
Calendar nominalInstanceCal = getEffectiveNominalTime();
if (nominalInstanceCal == null) {
return -1;
}
nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
/*
* if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
* { return -1; }
*/
nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
// DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
}
public static int ph3_coord_daysInMonth(int n) {
return ph2_coord_daysInMonth(n);
}
/**
* Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p> It depends
* on: <p> 1. Data set frequency <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST
* <p> 4. End Day/Month flag <p> 5. Data set initial instance <p> 6. Action Creation Time <p> 7. Existence of
* dataset's directory
*
* @param n :instance count <p> domain: n &lt;= 0, n is integer
* @return date-time in Oozie processing timezone of the n-th instance <p> returns 'null' means n-th instance is
* earlier than Initial-Instance of DS
* @throws Exception in case of wrong arguments, or HDFS access errors
*/
public static String ph3_coord_latest(int n) throws Exception {
ParamChecker.checkLEZero(n, "latest:n");
if (isSyncDataSet()) {// For Sync Dataset
return coord_latest_sync(n);
}
else {
throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
}
}
/**
* Determine the date-time in Oozie processing timezone of latest available dataset instances
* from start to end offsets from the nominal time. <p> It depends
* on: <p> 1. Data set frequency <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST
* <p> 4. End Day/Month flag <p> 5. Data set initial instance <p> 6. Action Creation Time <p> 7. Existence of
* dataset's directory
*
* @param start :start instance offset <p> domain: start &lt;= 0, start is integer
* @param end :end instance offset <p> domain: end &lt;= 0, end is integer
* @return date-time in Oozie processing timezone of the instances from start to end offsets
* delimited by comma. <p> returns 'null' means start offset instance is
* earlier than Initial-Instance of DS
* @throws Exception in case of wrong arguments, or HDFS access errors
*/
public static String ph3_coord_latestRange(int start, int end) throws Exception {
ParamChecker.checkLEZero(start, "latest:n");
ParamChecker.checkLEZero(end, "latest:n");
if (isSyncDataSet()) {// For Sync Dataset
return coord_latestRange_sync(start, end);
}
else {
throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
}
}
/**
* Configure an evaluator with data set and application specific information. <p> Helper method of associating
* dataset and application object
*
* @param evaluator : to set variables
* @param ds : Data Set object
* @param coordAction : Application instance
*/
public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
evaluator.setVariable(COORD_ACTION, coordAction);
evaluator.setVariable(DATASET, ds);
}
/**
* Helper method to wrap around with "${..}". <p>
*
*
* @param eval :EL evaluator
* @param expr : expression to evaluate
* @return Resolved expression or echo back the same expression
* @throws Exception if evaluating expression fails
*/
public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
try {
eval.setVariable(".wrap", null);
String result = eval.evaluate(expr, String.class);
if (eval.getVariable(".wrap") != null) {
return "${" + result + "}";
}
else {
return result;
}
}
catch (Exception e) {
throw new ElException(ErrorCode.E1004, "Unable to evaluate :" + expr + ":\n", e);
}
}
// Set of echo functions
public static String ph1_coord_current_echo(String n) {
return echoUnResolved("current", n);
}
public static String ph1_coord_absolute_echo(String date) {
return echoUnResolved("absolute", date);
}
public static String ph1_coord_endOfMonths_echo(String date) {
return echoUnResolved("endOfMonths", date);
}
public static String ph1_coord_endOfWeeks_echo(String date) {
return echoUnResolved("endOfWeeks", date);
}
public static String ph1_coord_endOfDays_echo(String date) {
return echoUnResolved("endOfDays", date);
}
public static String ph1_coord_currentRange_echo(String start, String end) {
return echoUnResolved("currentRange", start + ", " + end);
}
public static String ph1_coord_offset_echo(String n, String timeUnit) {
return echoUnResolved("offset", n + " , " + timeUnit);
}
public static String ph2_coord_current_echo(String n) {
return echoUnResolved("current", n);
}
public static String ph2_coord_currentRange_echo(String start, String end) {
return echoUnResolved("currentRange", start + ", " + end);
}
public static String ph2_coord_offset_echo(String n, String timeUnit) {
return echoUnResolved("offset", n + " , " + timeUnit);
}
public static String ph2_coord_absolute_echo(String date) {
return echoUnResolved("absolute", date);
}
public static String ph2_coord_endOfMonths_echo(String date) {
return echoUnResolved("endOfMonths", date);
}
public static String ph2_coord_endOfWeeks_echo(String date) {
return echoUnResolved("endOfWeeks", date);
}
public static String ph2_coord_endOfDays_echo(String date) {
return echoUnResolved("endOfDays", date);
}
public static String ph2_coord_absolute_range(String startInstance, int end) throws Exception {
final AtomicInteger instanceCount = new AtomicInteger(0);
Calendar startInstanceCal = DateUtils.getCalendar(startInstance);
Calendar currentInstance = getCurrentInstance(startInstanceCal.getTime(), instanceCount);
// getCurrentInstance() returns null, which means startInstance is less
// than initial instance
if (currentInstance == null) {
throw new CommandException(ErrorCode.E1010,
"initial-instance should be equal or earlier than the start-instance. initial-instance is "
+ getInitialInstance() + " and start-instance is " + startInstance);
}
if (currentInstance.getTimeInMillis() != startInstanceCal.getTimeInMillis()) {
throw new CommandException(ErrorCode.E1010,
"initial-instance is not in phase with start-instance. initial-instance is "
+ DateUtils.formatDateOozieTZ(getInitialInstanceCal()) + " and start-instance is "
+ DateUtils.formatDateOozieTZ(startInstanceCal));
}
final AtomicInteger nominalCount = new AtomicInteger(0);
if (getCurrentInstance(getActionCreationtime(), nominalCount) == null) {
throw new CommandException(ErrorCode.E1010,
"initial-instance should be equal or earlier than the nominal time. initial-instance is "
+ getInitialInstance() + " and nominal time is " + getActionCreationtime());
}
// getCurrentInstance return offset relative to initial instance.
// start instance offset - nominal offset = start offset relative to
// nominal time-stamp.
int start = instanceCount.get() - nominalCount.get();
if (start > end) {
throw new CommandException(ErrorCode.E1010,
"start-instance should be equal or earlier than the end-instance. startInstance is "
+ startInstance + " which is equivalent to current (" + instanceCount.get()
+ ") but end is specified as current (" + end + ")");
}
return ph2_coord_currentRange(start, end);
}
public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
}
public static String ph1_coord_dateTzOffset_echo(String n, String timezone) {
return echoUnResolved("dateTzOffset", n + " , " + timezone);
}
public static String ph1_coord_epochTime_echo(String dateTime, String millis) {
// Quote the dateTime value since it would contain a ':'.
return echoUnResolved("epochTime", "'"+dateTime+"'" + " , " + millis);
}
public static String ph1_coord_formatTime_echo(String dateTime, String format) {
// Quote the dateTime value since it would contain a ':'.
return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
}
public static String ph1_coord_latest_echo(String n) {
return echoUnResolved("latest", n);
}
public static String ph2_coord_latest_echo(String n) {
return ph1_coord_latest_echo(n);
}
public static String ph1_coord_future_echo(String n, String instance) {
return echoUnResolved("future", n + ", " + instance + "");
}
public static String ph2_coord_future_echo(String n, String instance) {
return ph1_coord_future_echo(n, instance);
}
public static String ph1_coord_latestRange_echo(String start, String end) {
return echoUnResolved("latestRange", start + ", " + end);
}
public static String ph2_coord_latestRange_echo(String start, String end) {
return ph1_coord_latestRange_echo(start, end);
}
public static String ph1_coord_futureRange_echo(String start, String end, String instance) {
return echoUnResolved("futureRange", start + ", " + end + ", " + instance);
}
public static String ph2_coord_futureRange_echo(String start, String end, String instance) {
return ph1_coord_futureRange_echo(start, end, instance);
}
public static String ph1_coord_dataIn_echo(String n) {
ELEvaluator eval = ELEvaluator.getCurrent();
String val = (String) eval.getVariable("oozie.dataname." + n);
if ((val == null || val.equals("data-in") == false)) {
XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid");
throw new RuntimeException("data_in_name " + n + " is not valid");
}
return echoUnResolved("dataIn", "'" + n + "'");
}
public static String ph1_coord_dataOut_echo(String n) {
ELEvaluator eval = ELEvaluator.getCurrent();
String val = (String) eval.getVariable("oozie.dataname." + n);
if (val == null || val.equals("data-out") == false) {
XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid");
throw new RuntimeException("data_out_name " + n + " is not valid");
}
return echoUnResolved("dataOut", "'" + n + "'");
}
public static String ph1_coord_nominalTime_echo() {
return echoUnResolved("nominalTime", "");
}
public static String ph1_coord_nominalTime_echo_wrap() {
// return "${coord:nominalTime()}"; // no resolution
return echoUnResolved("nominalTime", "");
}
public static String ph1_coord_nominalTime_echo_fixed() {
return "2009-03-06T010:00"; // Dummy resolution
}
public static String ph1_coord_actualTime_echo_wrap() {
// return "${coord:actualTime()}"; // no resolution
return echoUnResolved("actualTime", "");
}
public static String ph1_coord_actionId_echo() {
return echoUnResolved("actionId", "");
}
public static String ph1_coord_name_echo() {
return echoUnResolved("name", "");
}
// The following echo functions are not used in any phases yet
// They are here for future purpose.
public static String coord_minutes_echo(String n) {
return echoUnResolved("minutes", n);
}
public static String coord_hours_echo(String n) {
return echoUnResolved("hours", n);
}
public static String coord_days_echo(String n) {
return echoUnResolved("days", n);
}
public static String coord_endOfDay_echo(String n) {
return echoUnResolved("endOfDay", n);
}
public static String coord_months_echo(String n) {
return echoUnResolved("months", n);
}
public static String coord_endOfMonth_echo(String n) {
return echoUnResolved("endOfMonth", n);
}
public static String coord_actualTime_echo() {
return echoUnResolved("actualTime", "");
}
// This echo function will always return "24" for validation only.
// This evaluation ****should not**** replace the original XML
// Create a temporary string and validate the function
// This is **required** for evaluating an expression like
// coord:HoursInDay(0) + 3
// actual evaluation will happen in phase 2 or phase 3.
public static String ph1_coord_hoursInDay_echo(String n) {
return "24";
// return echoUnResolved("hoursInDay", n);
}
// This echo function will always return "30" for validation only.
// This evaluation ****should not**** replace the original XML
// Create a temporary string and validate the function
// This is **required** for evaluating an expression like
// coord:daysInMonth(0) + 3
// actual evaluation will happen in phase 2 or phase 3.
public static String ph1_coord_daysInMonth_echo(String n) {
// return echoUnResolved("daysInMonth", n);
return "30";
}
// This echo function will always return "3" for validation only.
// This evaluation ****should not**** replace the original XML
// Create a temporary string and validate the function
// This is **required** for evaluating an expression like coord:tzOffset + 2
// actual evaluation will happen in phase 2 or phase 3.
public static String ph1_coord_tzOffset_echo() {
// return echoUnResolved("tzOffset", "");
return "3";
}
// Local methods
/**
* @param n the required instance position
* @return n-th instance Date-Time from current instance for data-set <p> return empty string ("") if the
* Action_Creation_time or the n-th instance <p> is earlier than the Initial_Instance of dataset.
*/
private static String coord_current_sync(int n) {
return coord_currentRange_sync(n, n);
}
private static String coord_currentRange_sync(int start, int end) {
final XLog LOG = XLog.getLog(CoordELFunctions.class);
int datasetFrequency = getDSFrequency();// in minutes
TimeUnit dsTimeUnit = getDSTimeUnit();
final AtomicInteger instCount = new AtomicInteger(0);// used as pass by ref
Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
if (nominalInstanceCal == null) {
LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
+ " returned. This means that no data is available at the current-instance specified by the user"
+ " and the user could try modifying his initial-instance to an earlier time.");
return "";
} else {
Calendar initInstance = getInitialInstanceCal();
// Add in the reverse order - newest instance first.
nominalInstanceCal = (Calendar) initInstance.clone();
nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount.get() + start) * datasetFrequency);
List<String> instances = new ArrayList<String>();
for (int i = start; i <= end; i++) {
if (nominalInstanceCal.compareTo(initInstance) < 0) {
LOG.warn("If the initial instance of the dataset is later than the current-instance specified,"
+ " such as coord:current({0}) in this case, an empty string is returned. This means that"
+ " no data is available at the current-instance specified by the user and the user could"
+ " try modifying his initial-instance to an earlier time.", start);
}
else {
instances.add(DateUtils.formatDateOozieTZ(nominalInstanceCal));
}
nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
}
Collections.reverse(instances);
return StringUtils.join(instances, CoordELFunctions.INSTANCE_SEPARATOR);
}
}
/**
*
* @param n offset amount (integer)
* @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
* @return the offset time from the effective nominal time <p> return empty string ("") if the Action_Creation_time or the
* offset instance <p> is earlier than the Initial_Instance of dataset.
*/
private static String coord_offset_sync(int n, String timeUnit) {
Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null);
if (rawCal == null) {
// warning already logged by resolveOffsetRawTime()
return "";
}
int freq = getDSFrequency();
TimeUnit freqUnit = getDSTimeUnit();
int freqCount = 0;
// We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same
// time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time
// In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true
// If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that.
Calendar cal = getInitialInstanceCal();
if (rawCal.before(cal)) {
while (cal.after(rawCal)) {
cal.add(freqUnit.getCalendarUnit(), -freq);
freqCount--;
}
}
else if (rawCal.after(cal)) {
while (cal.before(rawCal)) {
cal.add(freqUnit.getCalendarUnit(), freq);
freqCount++;
}
}
if (cal.before(rawCal)) {
rawCal = cal;
}
else if (cal.after(rawCal)) {
cal.add(freqUnit.getCalendarUnit(), -freq);
rawCal = cal;
freqCount--;
}
String rawCalStr = DateUtils.formatDateOozieTZ(rawCal);
Calendar nominalInstanceCal = getInitialInstanceCal();
nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount);
if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance"
+ " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no"
+ " data is available at the offset instance specified by the user and the user could try modifying his"
+ " initial-instance to an earlier time.", n, timeUnit);
return "";
}
String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal);
if (!rawCalStr.equals(nominalCalStr)) {
throw new RuntimeException("Shouldn't happen");
}
return rawCalStr;
}
/**
* @param offset offset
* @return n-th available latest instance Date-Time for SYNC data-set
* @throws Exception in case of wrong arguments, or HDFS access errors
*/
private static String coord_latest_sync(int offset) throws Exception {
return coord_latestRange_sync(offset, offset);
}
private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
return new LatestEvaluator(startOffset, endOffset).evaluate();
}
/**
* @param tm calendar
* @return a new Evaluator to be used for URI-template evaluation
*/
private static ELEvaluator getUriEvaluator(Calendar tm) {
tm.setTimeZone(DateUtils.getOozieProcessingTimeZone());
ELEvaluator retEval = new ELEvaluator();
retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
.get(Calendar.MONTH) + 1));
retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
.get(Calendar.DAY_OF_MONTH));
retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
.get(Calendar.HOUR_OF_DAY));
retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
.get(Calendar.MINUTE));
return retEval;
}
/**
* @return whether a data set is SYNCH or ASYNC
*/
private static boolean isSyncDataSet() {
ELEvaluator eval = ELEvaluator.getCurrent();
SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
if (ds == null) {
throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
}
return ds.getType().equalsIgnoreCase("SYNC");
}
/**
* Check whether a function should be resolved.
*
* @param functionName name of the function
* @param n the function parameter
* @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
*/
private static String checkIfResolved(String functionName, String n) {
ELEvaluator eval = ELEvaluator.getCurrent();
String replace = (String) eval.getVariable("resolve_" + functionName);
if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
// resolve
// return "${coord:" + functionName + "(" + n +")}"; //Unresolved
eval.setVariable(".wrap", "true");
return "coord:" + functionName + "(" + n + ")"; // Unresolved
}
return null; // Resolved it
}
private static String echoUnResolved(String functionName, String n) {
return echoUnResolvedPre(functionName, n, "coord:");
}
private static String echoUnResolvedPre(String functionName, String n, String prefix) {
ELEvaluator eval = ELEvaluator.getCurrent();
eval.setVariable(".wrap", "true");
return prefix + functionName + "(" + n + ")"; // Unresolved
}
/**
* @return the initial instance of a DataSet in DATE
*/
private static Date getInitialInstance() {
ELEvaluator eval = ELEvaluator.getCurrent();
return getInitialInstance(eval);
}
/**
* @return the initial instance of a DataSet in DATE
*/
private static Date getInitialInstance(ELEvaluator eval) {
return getInitialInstanceCal(eval).getTime();
// return ds.getInitInstance();
}
/**
* @return the initial instance of a DataSet in Calendar
*/
private static Calendar getInitialInstanceCal() {
ELEvaluator eval = ELEvaluator.getCurrent();
return getInitialInstanceCal(eval);
}
/**
* @return the initial instance of a DataSet in Calendar
*/
private static Calendar getInitialInstanceCal(ELEvaluator eval) {
SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
if (ds == null) {
throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
}
Calendar effInitTS = new GregorianCalendar(ds.getTimeZone());
effInitTS.setTime(ds.getInitInstance());
// To adjust EOD/EOM
DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval));
return effInitTS;
// return ds.getInitInstance();
}
/**
* @return Nominal or action creation Time when all the dependencies of an application instance are met.
*/
private static Date getActionCreationtime() {
ELEvaluator eval = ELEvaluator.getCurrent();
return getActionCreationtime(eval);
}
/**
* @return Nominal or action creation Time when all the dependencies of an application instance are met.
*/
private static Date getActionCreationtime(ELEvaluator eval) {
SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
if (coordAction == null) {
throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
}
return coordAction.getNominalTime();
}
/**
* @return Actual Time when all the dependencies of an application instance are met.
*/
private static Date getActualTime() {
ELEvaluator eval = ELEvaluator.getCurrent();
SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
if (coordAction == null) {
throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
}
return coordAction.getActualTime();
}
/**
* @return TimeZone for the application or job.
*/
private static TimeZone getJobTZ() {
ELEvaluator eval = ELEvaluator.getCurrent();
SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
if (coordAction == null) {
throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
}
return coordAction.getTimeZone();
}
/**
* Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
*
* @param effectiveTime effective time
* @param instanceCount instance count
* @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
* the dataset.
*/
public static Calendar getCurrentInstance(Date effectiveTime, AtomicInteger instanceCount) {
ELEvaluator eval = ELEvaluator.getCurrent();
return getCurrentInstance(effectiveTime, instanceCount, eval);
}
/**
* Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
*
* @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
* the dataset.
*/
private static Calendar getCurrentInstance(Date effectiveTime, AtomicInteger instanceCount, ELEvaluator eval) {
Date datasetInitialInstance = getInitialInstance(eval);
TimeUnit dsTimeUnit = getDSTimeUnit(eval);
TimeZone dsTZ = getDatasetTZ(eval);
int dsFreq = getDSFrequency(eval);
// Convert Date to Calendar for corresponding TZ
Calendar current = Calendar.getInstance(dsTZ);
current.setTime(datasetInitialInstance);
Calendar calEffectiveTime = new GregorianCalendar(dsTZ);
calEffectiveTime.setTime(effectiveTime);
if (instanceCount == null) { // caller doesn't care about this value
instanceCount = new AtomicInteger(0);
}
instanceCount.set(0);
if (current.compareTo(calEffectiveTime) > 0) {
return null;
}
switch(dsTimeUnit) {
case MINUTE:
instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC));
break;
case HOUR:
instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC));
break;
case DAY:
case END_OF_DAY:
instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC));
break;
case WEEK:
case END_OF_WEEK:
instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / WEEK_MSEC));
break;
case MONTH:
case END_OF_MONTH:
int diffYear = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR);
instanceCount.set(diffYear * 12 + calEffectiveTime.get(Calendar.MONTH) - current.get(Calendar.MONTH));
break;
case YEAR:
instanceCount.set(calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR));
break;
default:
throw new IllegalArgumentException("Unhandled dataset time unit " + dsTimeUnit);
}
if (instanceCount.get() > 2) {
instanceCount.set(instanceCount.get() / dsFreq);
current.add(dsTimeUnit.getCalendarUnit(), instanceCount.get() * dsFreq);
} else {
instanceCount.set(0);
}
while (!current.getTime().after(effectiveTime)) {
current.add(dsTimeUnit.getCalendarUnit(), dsFreq);
instanceCount.incrementAndGet();
}
current.add(dsTimeUnit.getCalendarUnit(), -dsFreq);
instanceCount.decrementAndGet();
return current;
}
public static Calendar getEffectiveNominalTime() {
Date datasetInitialInstance = getInitialInstance();
TimeZone dsTZ = getDatasetTZ();
// Convert Date to Calendar for corresponding TZ
Calendar current = Calendar.getInstance();
current.setTime(datasetInitialInstance);
current.setTimeZone(dsTZ);
Calendar calEffectiveTime = Calendar.getInstance();
calEffectiveTime.setTime(getActionCreationtime());
calEffectiveTime.setTimeZone(dsTZ);
if (current.compareTo(calEffectiveTime) > 0) {
// Nominal Time < initial Instance
// TODO: getClass() call doesn't work from static method.
// XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
// current.getTime());
return null;
}
return calEffectiveTime;
}
/**
* @return dataset frequency in minutes
*/
private static int getDSFrequency() {
ELEvaluator eval = ELEvaluator.getCurrent();
return getDSFrequency(eval);
}
/**
* @return dataset frequency in minutes
*/
private static int getDSFrequency(ELEvaluator eval) {
SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
if (ds == null) {
throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
}
return ds.getFrequency();
}
/**
* @return dataset TimeUnit
*/
private static TimeUnit getDSTimeUnit() {
ELEvaluator eval = ELEvaluator.getCurrent();
return getDSTimeUnit(eval);
}
/**
* @param eval evaluator
* @return dataset TimeUnit
*/
public static TimeUnit getDSTimeUnit(ELEvaluator eval) {
SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
if (ds == null) {
throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
}
return ds.getTimeUnit();
}
/**
* @return dataset TimeZone
*/
public static TimeZone getDatasetTZ() {
ELEvaluator eval = ELEvaluator.getCurrent();
return getDatasetTZ(eval);
}
/**
* @param eval evaluator
* @return dataset TimeZone
*/
public static TimeZone getDatasetTZ(ELEvaluator eval) {
SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
if (ds == null) {
throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
}
return ds.getTimeZone();
}
/**
* @return dataset TimeUnit
*/
private static TimeUnit getDSEndOfFlag() {
ELEvaluator eval = ELEvaluator.getCurrent();
return getDSEndOfFlag(eval);
}
/**
* @return dataset TimeUnit
*/
private static TimeUnit getDSEndOfFlag(ELEvaluator eval) {
SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
if (ds == null) {
throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
}
return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
}
/**
* Return a job configuration property for the coordinator.
*
* @param property property name.
* @return the value of the property, <code>null</code> if the property is undefined.
*/
public static String coord_conf(String property) {
ELEvaluator eval = ELEvaluator.getCurrent();
return (String) eval.getVariable(property);
}
/**
* Return the user that submitted the coordinator job.
*
* @return the user that submitted the coordinator job.
*/
public static String coord_user() {
ELEvaluator eval = ELEvaluator.getCurrent();
return (String) eval.getVariable(OozieClient.USER_NAME);
}
/**
* Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur
* between them. The caller should make sure that startCal is earlier than endCal.
* <p>
* As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal
* time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60
*
* @param startCal The earlier offset time
* @param endCal The later offset time
* @param eval The ELEvaluator to use; cannot be null
* @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal
*/
public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) {
List<Integer> expandedFreqs = new ArrayList<Integer>();
// Use eval because the "current" eval isn't set
int freq = getDSFrequency(eval);
TimeUnit freqUnit = getDSTimeUnit(eval);
Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
int totalFreq = 0;
if (startCal.before(cal)) {
while (cal.after(startCal)) {
cal.add(freqUnit.getCalendarUnit(), -freq);
totalFreq += -freq;
}
if (cal.before(startCal)) {
cal.add(freqUnit.getCalendarUnit(), freq);
totalFreq += freq;
}
}
else if (startCal.after(cal)) {
while (cal.before(startCal)) {
cal.add(freqUnit.getCalendarUnit(), freq);
totalFreq += freq;
}
}
// At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the
// effective nominal time. Now we can find all of the instances that occur between startCal and endCal, inclusive.
while (cal.before(endCal) || cal.equals(endCal)) {
expandedFreqs.add(totalFreq);
cal.add(freqUnit.getCalendarUnit(), freq);
totalFreq += freq;
}
return expandedFreqs;
}
/**
* Resolve the offset time from the effective nominal time
*
* @param n offset amount (integer)
* @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
* @param eval The ELEvaluator to use; or null to use the "current" eval
* @return A Calendar of the offset time
*/
public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) {
// Use eval if given (for when the "current" eval isn't set)
Calendar cal;
if (eval == null) {
cal = getCurrentInstance(getActionCreationtime(), null);
}
else {
cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
}
if (cal == null) {
XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an"
+ " empty string is returned. This means that no data is available at the offset instance specified by the user"
+ " and the user could try modifying his or her initial-instance to an earlier time.");
return null;
}
cal.add(timeUnit.getCalendarUnit(), n);
return cal;
}
/**
* Evaluating {@code coord:future()} and {@code coord:futureRange()} data input dependencies.
*/
static final class FutureEvaluator extends RangeEvaluator {
private final int instance;
private int checkedInstance = 0;
FutureEvaluator(final int startOffset, final int endOffset, final int instance) {
super("future", startOffset, endOffset);
this.instance = instance;
}
@Override
protected Calendar getNominalInstance() {
return getCurrentInstance(getActionCreationtime(), instCount);
}
@Override
protected void reset() {
super.reset();
checkedInstance = 0;
}
@Override
protected boolean isAvailable(final Calendar nominalInstance, final Calendar initInstance) {
return instance >= checkedInstance;
}
@Override
protected boolean isFirst() {
return available == endOffset;
}
@Override
protected boolean isInBetween() {
return available >= startOffset;
}
@Override
protected void stepAvailable() {
available++;
}
@Override
protected void stepInstanceCount() {
instCount.incrementAndGet();
checkedInstance++;
}
@Override
protected String from() {
return String.format("%s, %s", startOffset, instance);
}
@Override
protected String fromTo() {
return String.format("%s, %s, %s", startOffset, endOffset, instance);
}
}
/**
* Evaluating {@code coord:latest()} and {@code coord:latestRange()} data input dependencies.
*/
static final class LatestEvaluator extends RangeEvaluator {
LatestEvaluator(final int startOffset, final int endOffset) {
super("latest", startOffset, endOffset);
}
@Override
protected Calendar getNominalInstance() {
final boolean useCurrentTime = ConfigurationService.getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
if (useCurrentTime) {
return getCurrentInstance(new Date(), instCount);
}
else {
return getCurrentInstance(getActualTime(), instCount);
}
}
@Override
protected boolean isAvailable(final Calendar nominalInstance, final Calendar initInstance) {
return nominalInstance.compareTo(initInstance) >= 0;
}
@Override
protected boolean isFirst() {
return available == startOffset;
}
@Override
protected boolean isInBetween() {
return available <= endOffset;
}
@Override
protected void stepAvailable() {
available--;
}
@Override
protected void stepInstanceCount() {
instCount.decrementAndGet();
}
@Override
protected String from() {
return String.format("%s", startOffset);
}
@Override
protected String fromTo() {
return String.format("%s, %s", startOffset, endOffset);
}
}
private static abstract class RangeEvaluator {
/**
* What is the use case: evaluating {@code future} or {@code latest} dataset occurrences.
*/
private final String type;
/**
*
*/
final int startOffset;
final int endOffset;
final AtomicInteger instCount = new AtomicInteger(0);
int available = 0;
RangeEvaluator(final String type, final int startOffset, final int endOffset) {
this.type = type;
this.startOffset = startOffset;
this.endOffset = endOffset;
}
/**
* Evaluate the input data dependency's EL function based on HDFS checks for URI existence, or leave it unevaluated.
* <p>
* Based on following:
* <ul>
* <li>{@code startOffset}</li>
* <li>{@code endOffset}</li>
* <li>internal state like {@code instCount} and {@code available}</li>
* </ul>
* @return timestamp of the {@code future} / {@code latest} HDFS URI based on parameters, and HDFS URI presence, or the
* unmodified input EL expression, if not present.
* @throws Exception in case of wrong arguments, or HDFS access errors
*/
String evaluate() throws Exception {
final Thread currentThread = Thread.currentThread();
final ELEvaluator eval = ELEvaluator.getCurrent();
String retVal = "";
final int datasetFrequency = getDSFrequency();// in minutes
final TimeUnit dsTimeUnit = getDSTimeUnit();
instCount.set(0);
Calendar nominalInstance = getNominalInstance();
final StringBuilder resolvedInstances = new StringBuilder();
final StringBuilder resolvedURIPaths = new StringBuilder();
if (nominalInstance != null) {
final Calendar initInstance = getInitialInstanceCal();
SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
if (ds == null) {
throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
}
final String uriTemplate = ds.getUriTemplate();
final Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
if (conf == null) {
throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
}
reset();
boolean resolved = false;
final String user = ParamChecker
.notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
final String doneFlag = ds.getDoneFlag();
final URIHandlerService uriService = Services.get().get(URIHandlerService.class);
URIHandler uriHandler = null;
Context uriContext = null;
try {
int retries = 0;
final DateFormat dMYHMS = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss");
final long maxRetries = new OozieTimeUnitConverter().convertMillis(
(nominalInstance.getTime().getTime() - initInstance.getTime().getTime()) / ds.getFrequency(),
dsTimeUnit);
if (maxRetries > 0) {
LOG.debug("Approximately [{0}] maximal retries going back till [{1}] for checking latest " +
"dataset existence. [name={2}]",
maxRetries,
dMYHMS.format(initInstance.getTime()),
ds.getName());
}
while (isAvailable(nominalInstance, initInstance) && !currentThread.isInterrupted()) {
final ELEvaluator uriEval = getUriEvaluator(nominalInstance);
final String uriPath = uriEval.evaluate(uriTemplate, String.class);
if (uriHandler == null) {
URI uri = new URI(uriPath);
uriHandler = uriService.getURIHandler(uri);
uriContext = uriHandler.getContext(uri, conf, user, true);
}
final String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
LOG.trace("Checking dataset existence. [name={0};uriWithDoneFlag={1};retries={2};nominalTime={3}]",
ds.getName(),
uriWithDoneFlag,
retries,
dMYHMS.format(nominalInstance.getTime()));
final Date now = new Date();
final boolean uriWithDoneFlagExists = uriHandler.exists(new URI(uriWithDoneFlag), uriContext);
final Date later = new Date();
LOG.trace("[{0}] ms elapsed while checking for dataset existence. [name={1};uriWithDoneFlag={2}]",
later.getTime() - now.getTime(),
ds.getName(),
uriWithDoneFlag);
if (uriWithDoneFlagExists) {
LOG.debug("Found current dataset for {0}({1}). [name={2};uriWithDoneFlag={3}",
type,
available,
ds.getName(),
uriWithDoneFlag);
if (isFirst()) {
LOG.debug("Matched first dataset for {0}({1}), resolving. [name={2};uriWithDoneFlag={3}]",
type,
available,
ds.getName(),
uriWithDoneFlag);
resolved = true;
resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstance));
resolvedURIPaths.append(uriPath);
retVal = resolvedInstances.toString();
eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
break;
}
else if (isInBetween()) {
LOG.debug("Matched dataset in between for {0}({1}), continuing. [name={2};uriWithDoneFlag={3}]",
type,
available,
ds.getName(),
uriWithDoneFlag);
resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstance)).append(
INSTANCE_SEPARATOR);
resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
}
else {
LOG.debug("Not matching dataset for {0}({1}), continuing. [name={2};uriWithDoneFlag={3}]",
type,
available,
ds.getName(),
uriWithDoneFlag);
}
stepAvailable();
}
else {
LOG.trace("Could not find dataset. [name={0};uriWithDoneFlag={1}]",
ds.getName(),
uriWithDoneFlag);
}
stepInstanceCount();
nominalInstance = (Calendar) initInstance.clone();
nominalInstance.add(dsTimeUnit.getCalendarUnit(), instCount.get() * datasetFrequency);
retries++;
}
if (!StringUtils.isEmpty(resolvedURIPaths.toString())
&& eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
}
}
finally {
if (uriContext != null) {
uriContext.destroy();
}
}
if (!resolved) {
// return unchanged function with variable 'is_resolved'
// to 'false'
eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
if (startOffset == endOffset) {
retVal = String.format("${coord:%s(%s)}", type, from());
}
else {
retVal = String.format("${coord:%sRange(%s)}", type, fromTo());
}
}
else {
eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
}
}
else {// No feasible nominal time
eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
}
return retVal;
}
/**
* Get the nominal {@link Calendar} instance, that is, as base of the next evaluated occurrence.
* @return a {@link Calendar} instance based on {@link #type}
*/
protected abstract Calendar getNominalInstance();
/**
* Reset the evaluator's internal state between two {@link #evaluate()} calls.
*/
protected void reset() {
available = 0;
}
/**
* Checks whether a dataset is available.
* @param nominalInstance the nominal instance
* @param initInstance the initial instance
* @return true if dataset is available
*/
protected abstract boolean isAvailable(Calendar nominalInstance, Calendar initInstance);
/**
* Checks whether it's the first matching for the given evaluation.
* @return true if it is the first matching
*/
protected abstract boolean isFirst();
/**
* Checks whether it's not the first but a valid matching for the given evaluation.
* @return true if description is true
*/
protected abstract boolean isInBetween();
/**
* Modify the internal state in case of a match.
*/
protected abstract void stepAvailable();
/**
* Modify the internal state in case an instance was checked.
*/
protected abstract void stepInstanceCount();
/**
* Substitution of the range parameters for an open range with only a starting point.
* @return
*/
protected abstract String from();
/**
* Substitution of the range parameters for a closed range with a starting and an ending point.
* @return
*/
protected abstract String fromTo();
}
@VisibleForTesting
static class OozieTimeUnitConverter {
/**
* Convert {@code millis} given {@code source} to {@link java.util.concurrent.TimeUnit}.
* @param millis milliseconds
* @param source source time unit
* @return -1 if no correct {@code source} was given, else the estimated occurrence count of a dataset
*/
long convertMillis(final long millis, final TimeUnit source) {
Objects.requireNonNull(source, "source has to be filled");
switch (source) {
case YEAR:
return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS) / 365;
case MONTH:
return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS) / 31;
case DAY:
return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
case HOUR:
return java.util.concurrent.TimeUnit.HOURS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
case MINUTE:
return java.util.concurrent.TimeUnit.MINUTES.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
default:
return -1;
}
}
}
}