blob: 8c15099fb9b7c4beff6e6238f91da49d46b4a336 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.coord;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorEngine;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
import org.apache.oozie.coord.input.logic.InputLogicParser;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogService;
import org.apache.oozie.sla.SLAOperations;
import org.apache.oozie.util.CoordActionsInDateRange;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.Pair;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.JDOMException;
import com.google.common.annotations.VisibleForTesting;
public class CoordUtils {
public static final String HADOOP_USER = "user.name";
public static String getDoneFlag(Element doneFlagElement) {
if (doneFlagElement != null) {
return doneFlagElement.getTextTrim();
}
else {
return CoordELConstants.DEFAULT_DONE_FLAG;
}
}
public static Configuration getHadoopConf(Configuration jobConf) {
Configuration conf = new Configuration();
Objects.requireNonNull(jobConf, "Configuration to be used for hadoop setup cannot be null");
String user = ParamChecker.notEmpty(jobConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
conf.set(HADOOP_USER, user);
return conf;
}
/**
* Get the list of actions for a given coordinator job
* @param rangeType the rerun type (date, action)
* @param jobId the coordinator job id
* @param scope the date scope or action id scope
* @param active set to true if non-terminated
* @return the list of Coordinator actions
* @throws CommandException thrown if failed to get coordinator actions by given date range
*/
public static List<CoordinatorActionBean> getCoordActions(String rangeType, String jobId, String scope,
boolean active) throws CommandException {
List<CoordinatorActionBean> coordActions = null;
if (rangeType.equals(RestConstants.JOB_COORD_SCOPE_DATE)) {
coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope, active);
}
else if (rangeType.equals(RestConstants.JOB_COORD_SCOPE_ACTION)) {
coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope);
}
return coordActions;
}
public static List<String> getActionListForScopeAndDate(String id, String scope, String dates) throws CommandException {
List<String> actionIds = new ArrayList<String>();
List<String> parsed = new ArrayList<String>();
if (scope == null && dates == null) {
parsed.add(id);
return parsed;
}
if (dates != null) {
List<CoordinatorActionBean> actionSet = CoordUtils.getCoordActionsFromDates(id, dates, true);
for (CoordinatorActionBean action : actionSet) {
actionIds.add(action.getId());
}
parsed.addAll(actionIds);
}
if (scope != null) {
parsed.addAll(CoordUtils.getActionsIds(id, scope));
}
return parsed;
}
/**
* Get the list of actions for given date ranges
*
* @param jobId coordinator job id
* @param scope a comma-separated list of date ranges. Each date range element is specified with two dates separated by '::'
* @param active set to true if non-terminated
* @return the list of Coordinator actions for the date range
* @throws CommandException thrown if failed to get coordinator actions by given date range
*/
@VisibleForTesting
public static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active)
throws CommandException {
JPAService jpaService = Services.get().get(JPAService.class);
ParamChecker.notEmpty(jobId, "jobId");
ParamChecker.notEmpty(scope, "scope");
Set<CoordinatorActionBean> actionSet = new LinkedHashSet<CoordinatorActionBean>();
String[] list = scope.split(",");
for (String s : list) {
s = s.trim();
// A date range is specified with two dates separated by '::'
if (s.contains("::")) {
List<CoordinatorActionBean> listOfActions;
try {
// Get list of actions within the range of date
listOfActions = CoordActionsInDateRange.getCoordActionsFromDateRange(jobId, s, active);
}
catch (XException e) {
throw new CommandException(e);
}
actionSet.addAll(listOfActions);
}
else {
try {
// Get action for the nominal time
Date date = DateUtils.parseDateOozieTZ(s.trim());
CoordinatorActionBean coordAction = jpaService
.execute(new CoordJobGetActionForNominalTimeJPAExecutor(jobId, date));
if (coordAction != null) {
actionSet.add(coordAction);
}
else {
throw new RuntimeException("This should never happen, Coordinator Action shouldn't be null");
}
}
catch (ParseException e) {
throw new CommandException(ErrorCode.E0302, s.trim(), e);
}
catch (JPAExecutorException e) {
if (e.getErrorCode() == ErrorCode.E0605) {
XLog.getLog(CoordUtils.class).info("No action for nominal time:" + s + ". Skipping over");
}
throw new CommandException(e);
}
}
}
List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
for (CoordinatorActionBean coordAction : actionSet) {
coordActions.add(coordAction);
}
return coordActions;
}
public static Set<String> getActionsIds(String jobId, String scope) throws CommandException {
ParamChecker.notEmpty(jobId, "jobId");
ParamChecker.notEmpty(scope, "scope");
Set<String> actions = new LinkedHashSet<String>();
String[] list = scope.split(",");
for (String s : list) {
s = s.trim();
// An action range is specified with two actions separated by '-'
if (s.contains("-")) {
String[] range = s.split("-");
// Check the format for action's range
if (range.length != 2) {
throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "', an example of"
+ " correct format is 1-5");
}
int start;
int end;
//Get the starting and ending action numbers
try {
start = Integer.parseInt(range[0].trim());
} catch (NumberFormatException ne) {
throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", ne);
}
try {
end = Integer.parseInt(range[1].trim());
} catch (NumberFormatException ne) {
throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", ne);
}
if (start > end) {
throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "', starting action"
+ "number of the range should be less than ending action number, an example will be 1-4");
}
// Add the actionIds
for (int i = start; i <= end; i++) {
actions.add(jobId + "@" + i);
}
}
else {
try {
Integer.parseInt(s);
}
catch (NumberFormatException ne) {
throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
+ "'. Integer only.");
}
actions.add(jobId + "@" + s);
}
}
return actions;
}
/**
* Get the list of actions for given id ranges
*
* @param jobId coordinator job id
* @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-'
* @return the list of all Coordinator actions for action range
* @throws CommandException thrown if failed to get coordinator actions by given id range
*/
@VisibleForTesting
public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException {
JPAService jpaService = Services.get().get(JPAService.class);
Set<String> actions = getActionsIds(jobId, scope);
// Retrieve the actions using the corresponding actionIds
List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
for (String id : actions) {
CoordinatorActionBean coordAction = null;
try {
coordAction = jpaService.execute(new CoordActionGetJPAExecutor(id));
}
catch (JPAExecutorException je) {
if (je.getErrorCode().equals(ErrorCode.E0605)) { //ignore retrieval of non-existent actions in range
XLog.getLog(XLogService.class).warn(
"Coord action ID num [{0}] not yet materialized. Hence skipping over it for Kill action",
id.substring(id.indexOf("@") + 1));
continue;
}
else {
throw new CommandException(je);
}
}
coordActions.add(coordAction);
}
return coordActions;
}
/**
* Check if sla alert is disabled for action.
* @param actionBean the action bean
* @param coordName the coordinator name
* @param jobConf the job configuration
* @return true if SLA alert is disabled for action
* @throws ParseException if date parse fails
*/
public static boolean isSlaAlertDisabled(CoordinatorActionBean actionBean, String coordName, Configuration jobConf)
throws ParseException {
int disableSlaNotificationOlderThan = jobConf.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN,
ConfigurationService.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN));
if (disableSlaNotificationOlderThan > 0) {
// Disable alert for catchup jobs
long timeDiffinHrs = TimeUnit.MILLISECONDS.toHours(new Date().getTime()
- actionBean.getNominalTime().getTime());
if (timeDiffinHrs > jobConf.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN,
ConfigurationService.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN))) {
return true;
}
}
boolean disableAlert = false;
if (jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD) != null) {
String coords = jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD);
Set<String> coordsToDisableFor = new HashSet<String>(Arrays.asList(coords.split(",")));
if (coordsToDisableFor.contains(coordName)) {
return true;
}
if (coordsToDisableFor.contains(actionBean.getJobId())) {
return true;
}
}
// Check if sla alert is disabled for that action
if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_DISABLE_ALERT))
&& getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_DISABLE_ALERT)) {
return true;
}
// Check if sla alert is enabled for that action
if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_ENABLE_ALERT))
&& getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_ENABLE_ALERT)) {
return false;
}
return disableAlert;
}
/**
* Get coord action SLA alert status.
* @param actionBean
* @param coordName
* @param jobConf
* @param slaAlertType
* @return status of coord action SLA alert
* @throws ParseException if parsing date is not possible
*/
private static boolean getCoordActionSLAAlertStatus(CoordinatorActionBean actionBean, String coordName,
Configuration jobConf, String slaAlertType) throws ParseException {
String slaAlertList;
if (!StringUtils.isEmpty(jobConf.get(slaAlertType))) {
slaAlertList = jobConf.get(slaAlertType);
// check if ALL or date/action-num range
if (slaAlertList.equalsIgnoreCase(SLAOperations.ALL_VALUE)) {
return true;
}
String[] values = slaAlertList.split(",");
for (String value : values) {
value = value.trim();
if (value.contains("::")) {
String[] datesInRange = value.split("::");
Date start = DateUtils.parseDateOozieTZ(datesInRange[0].trim());
Date end = DateUtils.parseDateOozieTZ(datesInRange[1].trim());
// check if nominal time in this range
if (actionBean.getNominalTime().compareTo(start) >= 0
|| actionBean.getNominalTime().compareTo(end) <= 0) {
return true;
}
}
else if (value.contains("-")) {
String[] actionsInRange = value.split("-");
int start = Integer.parseInt(actionsInRange[0].trim());
int end = Integer.parseInt(actionsInRange[1].trim());
// check if action number in this range
if (actionBean.getActionNumber() >= start || actionBean.getActionNumber() <= end) {
return true;
}
}
else {
int actionNumber = Integer.parseInt(value.trim());
if (actionBean.getActionNumber() == actionNumber) {
return true;
}
}
}
}
return false;
}
// Form the where clause to filter by status values
public static Map<String, Object> getWhereClause(StringBuilder sb, Map<Pair<String, CoordinatorEngine.FILTER_COMPARATORS>,
List<Object>> filterMap) {
Map<String, Object> params = new HashMap<String, Object>();
int pcnt= 1;
for (Map.Entry<Pair<String, CoordinatorEngine.FILTER_COMPARATORS>, List<Object>> filter : filterMap.entrySet()) {
String field = filter.getKey().getFirst();
CoordinatorEngine.FILTER_COMPARATORS comp = filter.getKey().getSecond();
String sqlField;
if (field.equals(OozieClient.FILTER_STATUS)) {
sqlField = "a.statusStr";
} else if (field.equals(OozieClient.FILTER_NOMINAL_TIME)) {
sqlField = "a.nominalTimestamp";
} else {
throw new IllegalArgumentException("Invalid filter key " + field);
}
sb.append(" and ").append(sqlField).append(" ");
switch (comp) {
case EQUALS:
sb.append("IN (");
params.putAll(appendParams(sb, filter.getValue(), pcnt));
sb.append(")");
break;
case NOT_EQUALS:
sb.append("NOT IN (");
params.putAll(appendParams(sb, filter.getValue(), pcnt));
sb.append(")");
break;
case GREATER:
case GREATER_EQUAL:
case LESSTHAN:
case LESSTHAN_EQUAL:
if (filter.getValue().size() != 1) {
throw new IllegalArgumentException(field + comp.getSign() + " can't have more than 1 values");
}
sb.append(comp.getSign()).append(" ");
params.putAll(appendParams(sb, filter.getValue(), pcnt));
break;
}
pcnt += filter.getValue().size();
}
sb.append(" ");
return params;
}
private static Map<String, Object> appendParams(StringBuilder sb, List<Object> value, int sindex) {
Map<String, Object> params = new HashMap<String, Object>();
boolean first = true;
for (Object val : value) {
String pname = "p" + sindex++;
params.put(pname, val);
if (!first) {
sb.append(", ");
}
sb.append(':').append(pname);
first = false;
}
return params;
}
public static boolean isInputLogicSpecified(String actionXml) throws JDOMException {
return isInputLogicSpecified(XmlUtils.parseXml(actionXml));
}
public static boolean isInputLogicSpecified(Element eAction) throws JDOMException {
return eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAction.getNamespace()) != null;
}
public static String getInputLogic(String actionXml) throws JDOMException {
return getInputLogic(XmlUtils.parseXml(actionXml));
}
public static String getInputLogic(Element actionXml) throws JDOMException {
return new InputLogicParser().parse(actionXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC,
actionXml.getNamespace()));
}
}