blob: af298b99d34cbcb7d03939fdb9c2e4e73c96ea43 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.coord.input.dependency;
import java.io.IOException;
import java.io.StringReader;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordCommandUtils;
import org.apache.oozie.coord.CoordELConstants;
import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.coord.CoordUtils;
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.DependencyChecker;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.dependency.URIHandlerException;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.JDOMException;
/**
* Old approach where dependencies are stored as String.
*/
public class CoordOldInputDependency implements CoordInputDependency {
private XLog log = XLog.getLog(getClass());
protected transient String missingDependencies = "";
public CoordOldInputDependency(String missingDependencies) {
this.missingDependencies = missingDependencies;
}
public CoordOldInputDependency() {
}
@Override
public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList) {
appendToDependencies(inputInstanceList);
}
@Override
public String getMissingDependencies() {
return missingDependencies;
}
@Override
public boolean isDependencyMet() {
return StringUtils.isEmpty(missingDependencies);
}
@Override
public boolean isUnResolvedDependencyMet() {
return false;
}
@Override
public void setDependencyMet(boolean isDependencyMeet) {
if (isDependencyMeet) {
missingDependencies = "";
}
}
@Override
public String serialize() throws IOException {
return missingDependencies;
}
@Override
public List<String> getMissingDependenciesAsList() {
return Arrays.asList(DependencyChecker.dependenciesAsArray(missingDependencies));
}
@Override
public List<String> getAvailableDependenciesAsList() {
return new ArrayList<String>();
}
@Override
public void setMissingDependencies(String missingDependencies) {
this.missingDependencies = missingDependencies;
}
public void appendToDependencies(List<CoordInputInstance> inputInstanceList) {
StringBuilder sb = new StringBuilder(missingDependencies);
boolean isFirst = true;
for (CoordInputInstance coordInputInstance : inputInstanceList) {
if (isFirst) {
if (!StringUtils.isEmpty(sb.toString())) {
sb.append(CoordELFunctions.INSTANCE_SEPARATOR);
}
}
else {
sb.append(CoordELFunctions.INSTANCE_SEPARATOR);
}
sb.append(coordInputInstance.getInputDataInstance());
isFirst = false;
}
missingDependencies = sb.toString();
}
@Override
public void addUnResolvedList(String name, String unresolvedDependencies) {
StringBuilder sb = new StringBuilder(missingDependencies);
sb.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedDependencies);
missingDependencies = sb.toString();
}
@Override
public List<String> getAvailableDependencies(String dataSet) {
return null;
}
@Override
public void addToAvailableDependencies(Collection<String> availableList) {
if (StringUtils.isEmpty(missingDependencies)) {
return;
}
List<String> missingDependenciesList = new ArrayList<String>(
Arrays.asList((DependencyChecker.dependenciesAsArray(missingDependencies))));
missingDependenciesList.removeAll(availableList);
missingDependencies = DependencyChecker.dependenciesAsString(missingDependenciesList);
}
@Override
public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction, StringBuilder existList,
StringBuilder nonExistList) throws IOException, JDOMException {
Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
Element eAction = XmlUtils.parseXml(coordAction.getActionXml());
Element inputList = eAction.getChild("input-events", eAction.getNamespace());
if (inputList != null) {
if (nonExistList.length() > 0) {
checkListOfPaths(coordAction, existList, nonExistList, actionConf);
}
return nonExistList.length() == 0;
}
return true;
}
public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction,
boolean registerForNotification) throws CommandException, IOException {
return DependencyChecker.checkForAvailability(getMissingDependenciesAsList(),
new XConfiguration(new StringReader(coordAction.getRunConf())), !registerForNotification);
}
private boolean checkListOfPaths(CoordinatorActionBean coordAction, StringBuilder existList,
StringBuilder nonExistList, Configuration conf) throws IOException {
String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
if (uriList[0] != null) {
log.info("[" + coordAction.getId() + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0]
+ " is Missing.");
}
nonExistList.delete(0, nonExistList.length());
boolean allExists = true;
String existSeparator = "", nonExistSeparator = "";
String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
for (int i = 0; i < uriList.length; i++) {
if (allExists) {
allExists = pathExists(coordAction, uriList[i], conf, user);
log.info("[" + coordAction.getId() + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :"
+ allExists);
}
if (allExists) {
existList.append(existSeparator).append(uriList[i]);
existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
}
else {
nonExistList.append(nonExistSeparator).append(uriList[i]);
nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
}
}
return allExists;
}
public boolean pathExists(CoordinatorActionBean coordAction, String sPath, Configuration actionConf, String user)
throws IOException {
log.debug("checking for the file " + sPath);
try {
return CoordCommandUtils.pathExists(sPath, actionConf, user);
}
catch (URIHandlerException e) {
if (coordAction != null) {
coordAction.setErrorCode(e.getErrorCode().toString());
coordAction.setErrorMessage(e.getMessage());
}
if (e.getCause() != null && e.getCause() instanceof AccessControlException) {
throw (AccessControlException) e.getCause();
}
else {
log.error(e);
throw new IOException(e);
}
}
catch (URISyntaxException e) {
if (coordAction != null) {
coordAction.setErrorCode(ErrorCode.E0906.toString());
coordAction.setErrorMessage(e.getMessage());
}
log.error(e);
throw new IOException(e);
}
}
public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
StringBuilder nonResolvedList, boolean status) {
if ((!nonExistList.toString().equals(missingDependencies) || missingDependencies.isEmpty())) {
setMissingDependencies(nonExistList.toString());
return true;
}
return false;
}
@SuppressWarnings("unchecked")
public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) throws Exception {
Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
String actualTimeStr = eAction.getAttributeValue("action-actual-time");
Element inputList = eAction.getChild("input-events", eAction.getNamespace());
if (inputList == null) {
return true;
}
List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace());
Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
if (eDataEvents != null) {
Date actualTime = null;
if (actualTimeStr == null) {
actualTime = new Date();
}
else {
actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
}
for (Element dEvent : eDataEvents) {
if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) == null) {
continue;
}
ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, actionConf);
String unResolvedInstance = dEvent
.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()).getTextTrim();
String unresolvedList[] = unResolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
StringBuffer resolvedTmp = new StringBuffer();
for (int i = 0; i < unresolvedList.length; i++) {
String returnData = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
Boolean isResolved = (Boolean) eval.getVariable(CoordELConstants.IS_RESOLVED);
if (isResolved == false) {
log.info("[" + coordAction.getId() + "] :: Cannot resolve : " + returnData);
return false;
}
if (resolvedTmp.length() > 0) {
resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
}
resolvedTmp.append((String) eval.getVariable(CoordELConstants.RESOLVED_PATH));
}
if (resolvedTmp.length() > 0) {
if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR)
.append(dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
dEvent.removeChild("uris", dEvent.getNamespace());
}
Element uriInstance = new Element("uris", dEvent.getNamespace());
uriInstance.addContent(resolvedTmp.toString());
dEvent.getContent().add(1, uriInstance);
}
dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace());
}
}
return true;
}
public Map<String, ActionDependency> getMissingDependencies(CoordinatorActionBean coordAction)
throws CommandException, IOException, JDOMException {
Map<String, ActionDependency> dependenciesMap = null;
try {
dependenciesMap = getDependency(coordAction);
}
catch (URIHandlerException e) {
throw new IOException(e);
}
StringBuilder nonExistList = new StringBuilder();
StringBuilder nonResolvedList = new StringBuilder();
CoordCommandUtils.getResolvedList(getMissingDependencies(), nonExistList, nonResolvedList);
Set<String> missingSets = new HashSet<String>(
Arrays.asList(nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)));
missingSets.addAll(
Arrays.asList(nonResolvedList.toString().split(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR)));
for (Iterator<Map.Entry<String, ActionDependency>> it = dependenciesMap.entrySet().iterator(); it.hasNext();) {
Map.Entry<String, ActionDependency> entry = it.next();
ActionDependency dependency = entry.getValue();
dependency.getMissingDependencies().retainAll(missingSets);
if (dependency.getUriTemplate() != null) {
for (int i = 0; i < dependency.getMissingDependencies().size(); i++) {
if (dependency.getMissingDependencies().get(i).trim().startsWith("${coord:")) {
dependency.getMissingDependencies().set(i,
dependency.getMissingDependencies().get(i) + " -> " + dependency.getUriTemplate());
}
}
}
if (dependenciesMap.get(entry.getKey()).getMissingDependencies().isEmpty()) {
it.remove();
}
}
return dependenciesMap;
}
@SuppressWarnings("unchecked")
private Map<String, ActionDependency> getDependency(CoordinatorActionBean coordAction)
throws JDOMException, URIHandlerException {
Map<String, ActionDependency> dependenciesMap = new HashMap<String, ActionDependency>();
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
Element eAction = XmlUtils.parseXml(coordAction.getActionXml());
Element inputList = eAction.getChild("input-events", eAction.getNamespace());
if (inputList == null || inputList.getChildren().isEmpty()) {
return dependenciesMap;
}
List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace());
for (Element event : eDataEvents) {
Element uri = event.getChild("uris", event.getNamespace());
ActionDependency dependency = new ActionDependency();
if (uri != null) {
Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
event.getNamespace());
String[] dataSets = uri.getText().split(CoordELFunctions.INSTANCE_SEPARATOR);
String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
for (String dataSet : dataSets) {
URIHandler uriHandler;
uriHandler = uriService.getURIHandler(dataSet);
dependency.getMissingDependencies().add(uriHandler.getURIWithDoneFlag(dataSet, doneFlag));
}
}
if (event.getChildTextTrim(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, event.getNamespace()) != null) {
ActionDependency unResolvedDependency = getUnResolvedDependency(coordAction, event);
dependency.getMissingDependencies()
.addAll(unResolvedDependency.getMissingDependencies());
dependency.setUriTemplate(unResolvedDependency.getUriTemplate());
}
dependenciesMap.put(event.getAttributeValue("name"), dependency);
}
return dependenciesMap;
}
private ActionDependency getUnResolvedDependency(CoordinatorActionBean coordAction, Element event)
throws JDOMException, URIHandlerException {
String tmpUnresolved = event.getChildTextTrim(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, event.getNamespace());
StringBuilder nonResolvedList = new StringBuilder();
CoordCommandUtils.getResolvedList(getMissingDependencies(), new StringBuilder(), nonResolvedList);
ActionDependency dependency = new ActionDependency();
if (nonResolvedList.length() > 0) {
Element eDataset = event.getChild("dataset", event.getNamespace());
dependency.setUriTemplate(eDataset.getChild("uri-template", eDataset.getNamespace()).getTextTrim());
dependency.getMissingDependencies().add(tmpUnresolved);
}
return dependency;
}
@Override
public String getFirstMissingDependency() {
StringBuilder nonExistList = new StringBuilder();
String missingDependencies = getMissingDependencies();
StringBuilder nonResolvedList = new StringBuilder();
CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList);
String firstMissingDependency = "";
if (nonExistList.length() > 0) {
firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
}
else {
if (nonResolvedList.length() > 0) {
firstMissingDependency = nonResolvedList.toString().split(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR)[0];
}
}
return firstMissingDependency;
}
}