| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.extensions; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.command.coord.CoordCommandUtils; |
| import org.apache.oozie.coord.CoordELEvaluator; |
| import org.apache.oozie.coord.CoordELFunctions; |
| import org.apache.oozie.coord.SyncCoordAction; |
| import org.apache.oozie.coord.SyncCoordDataset; |
| import org.apache.oozie.dependency.ActionDependency; |
| import org.apache.oozie.dependency.DependencyChecker; |
| import org.apache.oozie.util.ELEvaluator; |
| import org.apache.oozie.util.ParamChecker; |
| import org.apache.oozie.util.XLog; |
| import org.jdom.Attribute; |
| import org.jdom.Element; |
| import org.jdom.Text; |
| |
| import java.text.DateFormat; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Calendar; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.TimeZone; |
| |
| /** |
| * Oozie EL Extensions for falcon. |
| */ |
| @SuppressWarnings("unchecked") |
| //SUSPEND CHECKSTYLE CHECK MethodName |
| public final class OozieELExtensions { |
| |
| public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm'Z'"; |
| public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); |
| |
| private enum TruncateBoundary { |
| NONE, DAY, MONTH, QUARTER, YEAR |
| } |
| |
| private enum DayOfWeek { |
| SUN, MON, TUE, WED, THU, FRI, SAT |
| } |
| |
| public static final String COORD_CURRENT = "coord:current"; |
| |
| private OozieELExtensions() { |
| } |
| |
| public static String ph1_dataIn_echo(String dataInName, String part) { |
| return "dataIn('" + dataInName + "', '" + part + "')"; |
| } |
| |
| public static String ph3_dataIn(String dataInName, String part) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| String uristr = (String) eval.getVariable(".datain." + dataInName); |
| |
| //optional input |
| if (uristr == null) { |
| Element dsEle = getDSElement(eval, dataInName); |
| SyncCoordAction appInst = (SyncCoordAction) eval.getVariable(CoordELFunctions.COORD_ACTION); |
| |
| Configuration conf = new Configuration(); |
| conf.set(OozieClient.USER_NAME, (String)eval.getVariable(OozieClient.USER_NAME)); |
| try { |
| ELEvaluator instEval = CoordELEvaluator.createInstancesELEvaluator(dsEle, appInst, conf); |
| StringBuilder instances = new StringBuilder(); |
| StringBuilder urisWithDoneFlag = new StringBuilder(); |
| CoordCommandUtils.resolveInstanceRange(dsEle, instances, appInst, conf, instEval); |
| CoordCommandUtils.createEarlyURIs(dsEle, instances.toString(), |
| new StringBuilder(), urisWithDoneFlag); |
| XLog.getLog(OozieELExtensions.class).debug("Resolved instances for " + dataInName + " : " |
| + urisWithDoneFlag.toString()); |
| // Check if availability flags are present for each instance. |
| ActionDependency actionDep = DependencyChecker.checkForAvailability(urisWithDoneFlag.toString(), |
| conf, false); |
| String doneFlag = (String) eval.getVariable(dataInName + ".done-flag"); |
| uristr = StringUtils.join(stripDoneFlag(actionDep.getAvailableDependencies(), doneFlag), ","); |
| // If no instances are present, point the optional input to empty dir. |
| if (StringUtils.isEmpty(uristr)) { |
| String emptyDir = (String) eval.getVariable(dataInName + ".empty-dir"); |
| XLog.getLog(OozieELExtensions.class).debug("No instances could be resolved. Passing empty dir : " |
| + emptyDir); |
| return emptyDir; |
| } |
| } catch (Exception e) { |
| throw new RuntimeException("Failed to resolve instance range for " + dataInName, e); |
| } |
| } else { |
| Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); |
| if (unresolved != null && unresolved) { |
| throw new RuntimeException("There are unresolved instances in " + uristr); |
| } |
| } |
| |
| if (StringUtils.isNotEmpty(uristr) && StringUtils.isNotEmpty(part) && !part.equals("null")) { |
| String[] uris = uristr.split(","); |
| StringBuilder mappedUris = new StringBuilder(); |
| for (String uri : uris) { |
| if (uri.trim().length() == 0) { |
| continue; |
| } |
| if (mappedUris.length() > 0) { |
| mappedUris.append(","); |
| } |
| mappedUris.append(uri).append("/").append(part); |
| } |
| return mappedUris.toString(); |
| } |
| return uristr; |
| } |
| |
| private static List<String> stripDoneFlag(List<String> availableDependencies, String doneFlag) { |
| if (StringUtils.isEmpty(doneFlag)) { |
| return availableDependencies; |
| } |
| List<String> strippedAvailableDeps = new ArrayList<>(); |
| for (String availableDep : availableDependencies) { |
| strippedAvailableDeps.add(StringUtils.stripEnd(availableDep, "/" + doneFlag)); |
| } |
| return strippedAvailableDeps; |
| } |
| |
| private static Element getDSElement(ELEvaluator eval, String dataInName) { |
| Element ele = new Element("datain"); |
| Element dsEle = new Element("dataset"); |
| ele.getChildren().add(dsEle); |
| |
| String[] attrs = {"initial-instance", "frequency", "freq_timeunit", "timezone", "end_of_duration"}; |
| for (String attr : attrs) { |
| dsEle.getAttributes().add(new Attribute(attr, (String) eval.getVariable(dataInName + "." + attr))); |
| } |
| |
| String[] children = {"done-flag", "uri-template"}; |
| for (String child : children) { |
| Element childEle = new Element(child); |
| String text = (String) eval.getVariable(dataInName + "." + child); |
| if (text != null) { |
| childEle.setContent(new Text(text.replace('%', '$'))); |
| } |
| dsEle.getChildren().add(childEle); |
| } |
| |
| String[] eleChildren = {"start-instance", "end-instance"}; |
| for (String child : eleChildren) { |
| Element childEle = new Element(child); |
| childEle.setContent(new Text("${" + ((String) eval.getVariable(dataInName + "." + child)) + "}")); |
| ele.getChildren().add(childEle); |
| } |
| |
| return ele; |
| } |
| |
| public static String ph1_now_echo(int hr, int min) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| eval.setVariable(".wrap", "true"); |
| return "now(" + hr + "," + min + ")"; // Unresolved |
| } |
| |
| public static String ph1_today_echo(int hr, int min) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| eval.setVariable(".wrap", "true"); |
| return "today(" + hr + ", " + min + ")"; // Unresolved |
| } |
| |
| public static String ph1_yesterday_echo(int hr, int min) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| eval.setVariable(".wrap", "true"); |
| return "yesterday(" + hr + ", " + min + ")"; // Unresolved |
| } |
| |
| public static String ph1_currentWeek_echo(String weekDayName, int hr, int min) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| eval.setVariable(".wrap", "true"); |
| return "currentWeek('" + weekDayName + "', " + hr + ", " + min + ")"; // Unresolved |
| } |
| |
| public static String ph1_lastWeek_echo(String weekDayName, int hr, int min) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| eval.setVariable(".wrap", "true"); |
| return "lastWeek('" + weekDayName + "', " + hr + ", " + min + ")"; // Unresolved |
| } |
| |
| public static String ph1_currentMonth_echo(int day, int hr, int min) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| eval.setVariable(".wrap", "true"); |
| return "currentMonth(" + day + ", " + hr + ", " + min + ")"; // Unresolved |
| } |
| |
| public static String ph1_lastMonth_echo(int day, int hr, int min) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| eval.setVariable(".wrap", "true"); |
| return "lastMonth(" + day + ", " + hr + ", " + min + ")"; // Unresolved |
| } |
| |
| public static String ph1_currentYear_echo(int month, int day, int hr, int min) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| eval.setVariable(".wrap", "true"); |
| return "currentYear(" + month + ", " + day + ", " + hr + ", " + min + ")"; // Unresolved |
| } |
| |
| public static String ph1_lastYear_echo(int month, int day, int hr, int min) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| eval.setVariable(".wrap", "true"); |
| return "lastYear(" + month + ", " + day + ", " + hr + ", " + min + ")"; // Unresolved |
| } |
| |
| public static String ph2_now_inst(int hr, int min) { |
| return mapToCurrentInstance(TruncateBoundary.NONE, 0, 0, 0, hr, min); |
| } |
| |
| public static String ph2_today_inst(int hr, int min) { |
| return mapToCurrentInstance(TruncateBoundary.DAY, 0, 0, 0, hr, min); |
| } |
| |
| public static String ph2_yesterday_inst(int hr, int min) { |
| return mapToCurrentInstance(TruncateBoundary.DAY, 0, 0, -1, hr, min); |
| } |
| |
| public static String ph2_currentWeek_inst(String weekDayName, int hr, int min) { |
| int day = getDayOffset(weekDayName); |
| return mapToCurrentInstance(TruncateBoundary.DAY, 0, 0, day, hr, min); |
| } |
| |
| public static String ph2_lastWeek_inst(String weekDayName, int hr, int min) { |
| int day = getDayOffset(weekDayName) - 7; |
| return mapToCurrentInstance(TruncateBoundary.DAY, 0, 0, day, hr, min); |
| } |
| |
| public static String ph2_currentMonth_inst(int day, int hr, int min) { |
| return mapToCurrentInstance(TruncateBoundary.MONTH, 0, 0, day, hr, min); |
| } |
| |
| public static String ph2_lastMonth_inst(int day, int hr, int min) { |
| return mapToCurrentInstance(TruncateBoundary.MONTH, 0, -1, day, hr, min); |
| } |
| |
| public static String ph2_currentYear_inst(int month, int day, int hr, int min) { |
| return mapToCurrentInstance(TruncateBoundary.YEAR, 0, month, day, hr, min); |
| } |
| |
| public static String ph2_lastYear_inst(int month, int day, int hr, int min) { |
| return mapToCurrentInstance(TruncateBoundary.YEAR, -1, month, day, hr, min); |
| } |
| |
| private static String evaluateCurrent(String curExpr) throws Exception { |
| if (curExpr.equals("")) { |
| return curExpr; |
| } |
| |
| int inst = CoordCommandUtils.parseOneArg(curExpr); |
| return CoordELFunctions.ph2_coord_current(inst); |
| } |
| |
| public static String ph2_now(int hr, int min) throws Exception { |
| if (isDatasetContext()) { |
| String inst = ph2_now_inst(hr, min); |
| return evaluateCurrent(inst); |
| } else { |
| return getEffectiveTimeStr(TruncateBoundary.NONE, 0, 0, 0, hr, min); |
| } |
| } |
| |
| private static boolean isActionContext() { |
| return !isDatasetContext(); |
| } |
| |
| private static boolean isDatasetContext() { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(CoordELFunctions.DATASET); |
| return ds != null; |
| } |
| |
| private static String getEffectiveTimeStr(TruncateBoundary trunc, int yr, int mon, |
| int day, int hr, int min) throws Exception { |
| Calendar time = getEffectiveTime(trunc, yr, mon, day, hr, min); |
| return formatDateUTC(time); |
| } |
| |
| private static DateFormat getISO8601DateFormat(TimeZone tz) { |
| DateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); |
| // Stricter parsing to prevent dates such as 2011-12-50T01:00Z (December 50th) from matching |
| dateFormat.setLenient(false); |
| dateFormat.setTimeZone(tz); |
| return dateFormat; |
| } |
| |
| public static String formatDateUTC(Date d) throws Exception { |
| return (d != null) ? getISO8601DateFormat(UTC).format(d) : "NULL"; |
| } |
| |
| public static String formatDateUTC(Calendar c) throws Exception { |
| return (c != null) ? formatDateUTC(c.getTime()) : "NULL"; |
| } |
| |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SF_SWITCH_FALLTHROUGH"}) |
| private static Calendar getEffectiveTime(TruncateBoundary trunc, int yr, int mon, int day, int hr, int min) { |
| Calendar cal; |
| if (isActionContext()) { |
| ELEvaluator eval = ELEvaluator.getCurrent(); |
| SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) |
| eval.getVariable(CoordELFunctions.COORD_ACTION), |
| "Coordinator Action"); |
| cal = Calendar.getInstance(action.getTimeZone()); |
| cal.setTime(action.getNominalTime()); |
| } else { |
| Calendar tmp = CoordELFunctions.getEffectiveNominalTime(); |
| if (tmp == null) { |
| return null; |
| } |
| cal = Calendar.getInstance(CoordELFunctions.getDatasetTZ()); |
| cal.setTimeInMillis(tmp.getTimeInMillis()); |
| } |
| |
| // truncate |
| switch (trunc) { |
| case YEAR: |
| cal.set(Calendar.MONTH, 0); |
| |
| case MONTH: |
| cal.set(Calendar.DAY_OF_MONTH, 1); |
| |
| case DAY: |
| cal.set(Calendar.HOUR_OF_DAY, 0); |
| cal.set(Calendar.MINUTE, 0); |
| cal.set(Calendar.SECOND, 0); |
| cal.set(Calendar.MILLISECOND, 0); |
| break; |
| |
| case NONE: // don't truncate |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Truncation boundary " + trunc + " is not supported"); |
| } |
| |
| // add |
| cal.add(Calendar.YEAR, yr); |
| cal.add(Calendar.MONTH, mon); |
| cal.add(Calendar.DAY_OF_MONTH, day); |
| cal.add(Calendar.HOUR_OF_DAY, hr); |
| cal.add(Calendar.MINUTE, min); |
| return cal; |
| } |
| |
| public static String ph2_today(int hr, int min) throws Exception { |
| if (isDatasetContext()) { |
| String inst = ph2_today_inst(hr, min); |
| return evaluateCurrent(inst); |
| } else { |
| return getEffectiveTimeStr(TruncateBoundary.DAY, 0, 0, 0, hr, min); |
| } |
| } |
| |
| public static String ph2_yesterday(int hr, int min) throws Exception { |
| if (isDatasetContext()) { |
| String inst = ph2_yesterday_inst(hr, min); |
| return evaluateCurrent(inst); |
| } else { |
| return getEffectiveTimeStr(TruncateBoundary.DAY, 0, 0, -1, hr, min); |
| } |
| } |
| |
| public static String ph2_currentMonth(int day, int hr, int min) throws Exception { |
| if (isDatasetContext()) { |
| String inst = ph2_currentMonth_inst(day, hr, min); |
| return evaluateCurrent(inst); |
| } else { |
| return getEffectiveTimeStr(TruncateBoundary.MONTH, 0, 0, day, hr, min); |
| } |
| } |
| |
| public static String ph2_lastMonth(int day, int hr, int min) throws Exception { |
| if (isDatasetContext()) { |
| String inst = ph2_lastMonth_inst(day, hr, min); |
| return evaluateCurrent(inst); |
| } else { |
| return getEffectiveTimeStr(TruncateBoundary.MONTH, 0, -1, day, hr, min); |
| } |
| } |
| |
| private static int getDayOffset(String weekDayName) { |
| int day; |
| Calendar effectiveTime; |
| if (isDatasetContext()) { |
| effectiveTime = CoordELFunctions.getEffectiveNominalTime(); |
| } else { |
| effectiveTime = getEffectiveTime(TruncateBoundary.DAY, 0, 0, 0, 0, 0); |
| } |
| int currentWeekDay = effectiveTime.get(Calendar.DAY_OF_WEEK); |
| int weekDay = DayOfWeek.valueOf(weekDayName).ordinal() + 1; //to map to Calendar.SUNDAY ... |
| day = weekDay - currentWeekDay; |
| if (weekDay > currentWeekDay) { |
| day = day - 7; |
| } |
| return day; |
| } |
| |
| public static String ph2_currentWeek(String weekDayName, int hr, int min) throws Exception { |
| int day = getDayOffset(weekDayName); |
| if (isDatasetContext()) { |
| String inst = ph2_currentMonth_inst(day, hr, min); |
| return evaluateCurrent(inst); |
| } else { |
| return getEffectiveTimeStr(TruncateBoundary.DAY, 0, 0, day, hr, min); |
| } |
| } |
| |
| public static String ph2_lastWeek(String weekDayName, int hr, int min) throws Exception { |
| int day = getDayOffset(weekDayName) - 7; |
| if (isDatasetContext()) { |
| String inst = ph2_lastMonth_inst(day, hr, min); |
| return evaluateCurrent(inst); |
| } else { |
| return getEffectiveTimeStr(TruncateBoundary.DAY, 0, 0, day, hr, min); |
| } |
| } |
| |
| public static String ph2_currentYear(int month, int day, int hr, int min) throws Exception { |
| if (isDatasetContext()) { |
| String inst = ph2_currentYear_inst(month, day, hr, min); |
| return evaluateCurrent(inst); |
| } else { |
| return getEffectiveTimeStr(TruncateBoundary.YEAR, 0, month, day, hr, min); |
| } |
| } |
| |
| public static String ph2_lastYear(int month, int day, int hr, int min) throws Exception { |
| if (isDatasetContext()) { |
| String inst = ph2_lastYear_inst(month, day, hr, min); |
| return evaluateCurrent(inst); |
| } else { |
| return getEffectiveTimeStr(TruncateBoundary.YEAR, -1, month, day, hr, min); |
| } |
| } |
| |
| /** |
| * Maps the dataset time to coord:current(n) with respect to action's |
| * nominal time dataset time = truncate(nominal time) + yr + day + month + |
| * hr + min. |
| * |
| * @param trunc |
| * : Truncate resolution |
| * @param yr |
| * : Year to add (can be -ve) |
| * @param month |
| * : month to add (can be -ve) |
| * @param day |
| * : day to add (can be -ve) |
| * @param hr |
| * : hr to add (can be -ve) |
| * @param min |
| * : min to add (can be -ve) |
| * @return coord:current(n) |
| * @throws Exception |
| * : If encountered an exception while evaluating |
| */ |
| private static String mapToCurrentInstance(TruncateBoundary trunc, int yr, int month, int day, int hr, int min) { |
| Calendar nominalInstanceCal = CoordELFunctions.getEffectiveNominalTime(); |
| if (nominalInstanceCal == null) { |
| XLog.getLog(OozieELExtensions.class).warn( |
| "If the initial instance of the dataset is later than the nominal time, " |
| + "an empty string is returned. This means that no data is available " |
| + "at the current-instance specified by the user and the user could " |
| + "try modifying his initial-instance to an earlier time."); |
| return ""; |
| } |
| |
| Calendar dsInstanceCal = getEffectiveTime(trunc, yr, month, day, hr, min); |
| |
| int[] instCnt = new int[1]; |
| Calendar compInstCal = CoordELFunctions.getCurrentInstance(dsInstanceCal.getTime(), instCnt); |
| if (compInstCal == null) { |
| return ""; |
| } |
| int dsInstanceCnt = instCnt[0]; |
| |
| compInstCal = CoordELFunctions.getCurrentInstance(nominalInstanceCal.getTime(), instCnt); |
| if (compInstCal == null) { |
| return ""; |
| } |
| int nominalInstanceCnt = instCnt[0]; |
| |
| return COORD_CURRENT + "(" + (dsInstanceCnt - nominalInstanceCnt) + ")"; |
| } |
| } |
| //RESUME CHECKSTYLE CHECK MethodName |