blob: c869b8727768552109b3d2ef24cfb083a1e0307f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.coord;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.command.coord.CoordCommandUtils;
import org.apache.oozie.coord.input.dependency.CoordInputDependency;
import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
/**
* This class provide different evaluators required at different stages
*/
public class CoordELEvaluator {
public static final Integer MINUTE = 1;
public static final Integer HOUR = 60 * MINUTE;
/**
* Create an evaluator to be used in resolving configuration vars and frequency constant/functions (used in Stage
* 1)
*
* @param conf : Configuration containing property variables
* @param group Name of the group of required EL Evaluator.
* @return configured ELEvaluator
*/
public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
setConfigToEval(eval, conf);
return eval;
}
/**
* Create a new Evaluator to resolve the EL functions and variables using action creation time (Phase 2)
*
* @param event : Xml element for data-in element usually enclosed by <data-in(out)> tag
* @param appInst : Application Instance related information such as Action creation Time
* @param conf :Configuration to substitute any variables
* @return configured ELEvaluator
* @throws Exception : If there is any date-time string in wrong format, the exception is thrown
*/
public static ELEvaluator createInstancesELEvaluator(Element event, SyncCoordAction appInst, Configuration conf)
throws Exception {
return createInstancesELEvaluator("coord-action-create", event, appInst, conf);
}
public static ELEvaluator createInstancesELEvaluator(String tag, Element event, SyncCoordAction appInst,
Configuration conf) throws Exception {
ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(tag);
setConfigToEval(eval, conf);
SyncCoordDataset ds = getDSObject(event);
CoordELFunctions.configureEvaluator(eval, ds, appInst);
return eval;
}
public static ELEvaluator createELEvaluatorForDataEcho(Configuration conf, String group,
HashMap<String, String> dataNameList) {
ELEvaluator eval = createELEvaluatorForGroup(conf, group);
for (Iterator<String> it = dataNameList.keySet().iterator(); it.hasNext();) {
String key = it.next();
String value = dataNameList.get(key);
eval.setVariable("oozie.dataname." + key, value);
}
return eval;
}
/**
* Create a new evaluator for Lazy resolve (phase 3). For example, coord_latest(n) and coord_actualTime()function
* should be resolved when all other data dependencies are met.
*
* @param actualTime : Action start time
* @param nominalTime : Action creation time
* @param dEvent :XML element for data-in element usually enclosed by &lt;data-in(out)&gt; tag
* @param conf :Configuration to substitute any variables
* @return configured ELEvaluator
* @throws Exception : If there is any date-time string in wrong format, the exception is thrown
*/
public static ELEvaluator createLazyEvaluator(Date actualTime, Date nominalTime, Element dEvent, Configuration conf)
throws Exception {
ELEvaluator eval = Services.get().get(ELService.class).createEvaluator("coord-action-start");
setConfigToEval(eval, conf);
SyncCoordDataset ds = getDSObject(dEvent);
SyncCoordAction appInst = new SyncCoordAction();
appInst.setNominalTime(nominalTime);
appInst.setActualTime(actualTime);
CoordELFunctions.configureEvaluator(eval, ds, appInst);
eval.setVariable(CoordELFunctions.CONFIGURATION, conf);
return eval;
}
/**
* Create a SLA evaluator to be used during Materialization
* @param eAction the action
* @param coordAction the coordinator action
* @param conf the configuration
* @return eval returns SLA evaluator to be used during Materialization
*/
public static ELEvaluator createSLAEvaluator(Element eAction, CoordinatorActionBean coordAction,
Configuration conf) {
ELEvaluator eval = Services.get().get(ELService.class).createEvaluator("coord-sla-create");
setConfigToEval(eval, conf);
SyncCoordAction appInst = new SyncCoordAction();// TODO:
appInst.setNominalTime(coordAction.getNominalTime());
appInst.setActualTime(coordAction.getCreatedTime());
appInst.setActionId(coordAction.getId());
appInst.setName(eAction.getAttributeValue("name"));
CoordELFunctions.configureEvaluator(eval, null, appInst);
Element events = eAction.getChild("output-events", eAction.getNamespace());
if (events != null) {
for (Object obj : events.getChildren("data-out", eAction.getNamespace())) {
Element data = (Element) obj;
if (data.getChild("uris", data.getNamespace()) != null) {
String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
uris = uris.replaceAll(CoordELFunctions.INSTANCE_SEPARATOR, CoordELFunctions.DIR_SEPARATOR);
eval.setVariable(".dataout." + data.getAttributeValue("name"), uris);
}
if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) {
eval.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true");
}
}
}
return eval;
}
/**
* Create an Evaluator using conf and input/output-data (used for sla)
* @param conf the configuration
* @param group the group for the EL expression
* @param dataNameList the name list for the data
* @return eval returns an Evaluator using conf and input/output-data (used for sla)
*/
public static ELEvaluator createELEvaluatorForDataAndConf(Configuration conf, String group,
HashMap<String, String> dataNameList) {
ELEvaluator eval = createELEvaluatorForDataEcho(conf, group, dataNameList);
setConfigToEval(eval, conf);
return eval;
}
/**
* Create an Evaluator to resolve dataIns and dataOuts of an application instance (used in stage 3)
*
* @param eJob : XML element for the application instance
* @param conf :Configuration to substitute any variables
* @param actionId the action Id
* @return configured ELEvaluator
* @throws Exception : If there is any date-time string in wrong format, the exception is thrown
*/
public static ELEvaluator createDataEvaluator(Element eJob, Configuration conf, String actionId) throws Exception {
return createDataEvaluator(eJob, conf, actionId, null, null);
}
public static ELEvaluator createDataEvaluator(Element eJob, Configuration conf, String actionId,
CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception {
ELEvaluator e = Services.get().get(ELService.class).createEvaluator("coord-action-start");
setConfigToEval(e, conf);
SyncCoordAction appInst = new SyncCoordAction();
String strNominalTime = eJob.getAttributeValue("action-nominal-time");
if (strNominalTime != null) {
appInst.setNominalTime(DateUtils.parseDateOozieTZ(strNominalTime));
appInst.setTimeZone(DateUtils.getTimeZone(eJob.getAttributeValue("timezone")));
appInst.setFrequency(eJob.getAttributeValue("frequency"));
appInst.setTimeUnit(TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit")));
appInst.setActionId(actionId);
appInst.setName(eJob.getAttributeValue("name"));
appInst.setPullDependencies(pullDependencies);
appInst.setPushDependencies(pushDependencies);
if (CoordUtils.isInputLogicSpecified(eJob)) {
e.setVariable(".actionInputLogic",
XmlUtils.prettyPrint(eJob.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eJob.getNamespace())).toString());
}
}
String strActualTime = eJob.getAttributeValue("action-actual-time");
if (strActualTime != null) {
appInst.setActualTime(DateUtils.parseDateOozieTZ(strActualTime));
}
CoordELFunctions.configureEvaluator(e, null, appInst);
Element events = eJob.getChild("input-events", eJob.getNamespace());
if (events != null) {
for (Element data : (List<Element>) events.getChildren("data-in", eJob.getNamespace())) {
if (data.getChild("uris", data.getNamespace()) != null) {
String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
uris = uris.replaceAll(CoordELFunctions.INSTANCE_SEPARATOR, CoordELFunctions.DIR_SEPARATOR);
e.setVariable(".datain." + data.getAttributeValue("name"), uris);
}
else {
}
if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) {
e.setVariable(".datain." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
// check
// null
}
Element doneFlagElement = data.getChild("dataset", data.getNamespace()).getChild("done-flag",
data.getNamespace());
String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
e.setVariable(".datain." + data.getAttributeValue("name") + ".doneFlag", doneFlag);
}
}
events = eJob.getChild("output-events", eJob.getNamespace());
if (events != null) {
for (Element data : (List<Element>) events.getChildren("data-out", eJob.getNamespace())) {
if (data.getChild("uris", data.getNamespace()) != null) {
String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
uris = uris.replaceAll(CoordELFunctions.INSTANCE_SEPARATOR, CoordELFunctions.DIR_SEPARATOR);
e.setVariable(".dataout." + data.getAttributeValue("name"), uris);
}
else {
}// TODO
if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) {
e.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
// check
// null
}
}
}
return e;
}
/**
* Create a new Evaluator to resolve URI temple with time specific constant
*
* @param strDate : Date-time
* @return configured ELEvaluator
* @throws Exception If there is any date-time string in wrong format, the exception is thrown
*/
public static ELEvaluator createURIELEvaluator(String strDate) throws Exception {
ELEvaluator eval = new ELEvaluator();
Calendar date = Calendar.getInstance(DateUtils.getOozieProcessingTimeZone());
// always???
date.setTime(DateUtils.parseDateOozieTZ(strDate));
eval.setVariable("YEAR", date.get(Calendar.YEAR));
eval.setVariable("MONTH", make2Digits(date.get(Calendar.MONTH) + 1));
eval.setVariable("DAY", make2Digits(date.get(Calendar.DAY_OF_MONTH)));
eval.setVariable("HOUR", make2Digits(date.get(Calendar.HOUR_OF_DAY)));
eval.setVariable("MINUTE", make2Digits(date.get(Calendar.MINUTE)));
return eval;
}
/**
* Create Dataset object using the Dataset XML information
*
* @param eData the xml
* @return ds returns Dataset object using the Dataset XML information
* @throws Exception if the Dataset object can't be created
*/
private static SyncCoordDataset getDSObject(Element eData) throws Exception {
SyncCoordDataset ds = new SyncCoordDataset();
Element eDataset = eData.getChild("dataset", eData.getNamespace());
// System.out.println("eDATA :"+ XmlUtils.prettyPrint(eData));
Date initInstance = DateUtils.parseDateOozieTZ(eDataset.getAttributeValue("initial-instance"));
ds.setInitInstance(initInstance);
if (eDataset.getAttributeValue("frequency") != null) {
int frequency = Integer.parseInt(eDataset.getAttributeValue("frequency"));
ds.setFrequency(frequency);
ds.setType("SYNC");
if (eDataset.getAttributeValue("freq_timeunit") == null) {
throw new RuntimeException("No freq_timeunit defined in data set definition\n"
+ XmlUtils.prettyPrint(eDataset));
}
ds.setTimeUnit(TimeUnit.valueOf(eDataset.getAttributeValue("freq_timeunit")));
if (eDataset.getAttributeValue("timezone") == null) {
throw new RuntimeException("No timezone defined in data set definition\n"
+ XmlUtils.prettyPrint(eDataset));
}
ds.setTimeZone(DateUtils.getTimeZone(eDataset.getAttributeValue("timezone")));
if (eDataset.getAttributeValue("end_of_duration") == null) {
throw new RuntimeException("No end_of_duration defined in data set definition\n"
+ XmlUtils.prettyPrint(eDataset));
}
ds.setEndOfDuration(TimeUnit.valueOf(eDataset.getAttributeValue("end_of_duration")));
Element doneFlagElement = eDataset.getChild("done-flag", eData.getNamespace());
String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
ds.setDoneFlag(doneFlag);
}
else {
ds.setType("ASYNC");
}
String name = eDataset.getAttributeValue("name");
ds.setName(name);
// System.out.println(name + " VAL "+ eDataset.getChild("uri-template",
// eData.getNamespace()));
String uriTemplate = eDataset.getChild("uri-template", eData.getNamespace()).getTextTrim();
ds.setUriTemplate(uriTemplate);
// ds.setTimeUnit(TimeUnit.MINUTES);
return ds;
}
/**
* Set all job configurations properties into evaluator.
*
* @param eval : Evaluator to set variables
* @param conf : configurations to set Evaluator
*/
private static void setConfigToEval(ELEvaluator eval, Configuration conf) {
for (Map.Entry<String, String> entry : conf) {
eval.setVariable(entry.getKey(), entry.getValue().trim());
}
}
/**
* make any one digit number to two digit string pre-appending a"0"
*
* @param num : number to make sting
* @return :String of length at least two digit.
*/
private static String make2Digits(int num) {
String ret = "" + num;
if (num <= 9) {
ret = "0" + ret;
}
return ret;
}
}