blob: 667842790a9ecb746f878029eaac18c69bc31ad9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.workflow.WorkflowApp;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.ErrorCode;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Service that provides application workflow definition reading from the path and creation of the proto configuration.
*/
public abstract class WorkflowAppService implements Service {
public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
public static final String HADOOP_USER = "user.name";
public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
public static final String OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.subworkflow.classpath.inheritance";
public static final String OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.wf.subworkflow.classpath.inheritance";
private Path systemLibPath;
private long maxWFLength;
private boolean oozieSubWfCPInheritance;
/**
* Initialize the workflow application service.
*
* @param services services instance.
*/
public void init(Services services) {
Configuration conf = services.getConf();
String path = ConfigurationService.get(conf, SYSTEM_LIB_PATH);
if (path.trim().length() > 0) {
systemLibPath = new Path(path.trim());
}
maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
oozieSubWfCPInheritance = conf.getBoolean(OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE, false);
}
/**
* Destroy the workflow application service.
*/
public void destroy() {
}
/**
* Return the public interface for workflow application service.
*
* @return {@link WorkflowAppService}.
*/
public Class<? extends Service> getInterface() {
return WorkflowAppService.class;
}
/**
* Read workflow definition.
*
* @param appPath application path.
* @param user user name.
* @param conf configuration
* @return workflow definition.
* @throws WorkflowException thrown if the definition could not be read.
*/
protected String readDefinition(String appPath, String user, Configuration conf)
throws WorkflowException {
try {
URI uri = new URI(appPath);
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
Configuration appConf = has.createConfiguration(uri.getAuthority());
FileSystem fs = has.createFileSystem(user, uri, appConf);
// app path could be a directory
Path path = new Path(uri.getPath());
if (!fs.isFile(path)) {
path = new Path(path, "workflow.xml");
}
FileStatus fsStatus = fs.getFileStatus(path);
if (fsStatus.getLen() > this.maxWFLength) {
throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
}
Reader reader = new InputStreamReader(fs.open(path), StandardCharsets.UTF_8);
StringWriter writer = new StringWriter();
IOUtils.copyCharStream(reader, writer);
return writer.toString();
}
catch (WorkflowException wfe) {
throw wfe;
}
catch (IOException ex) {
throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
}
catch (URISyntaxException ex) {
throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
}
catch (HadoopAccessorException ex) {
throw new WorkflowException(ex);
}
catch (Exception ex) {
throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
}
}
/**
* Create proto configuration. <p> The proto configuration includes the user,group and the paths which need to be
* added to distributed cache. These paths include .jar,.so and the resource file paths.
*
* @param jobConf job configuration.
* @param isWorkflowJob indicates if the job is a workflow job or not.
* @return proto configuration.
* @throws WorkflowException thrown if the proto action configuration could not be created.
*/
public XConfiguration createProtoActionConf(Configuration jobConf, boolean isWorkflowJob)
throws WorkflowException {
try {
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
Configuration conf = has.createConfiguration(uri.getAuthority());
XConfiguration protoConf = new XConfiguration();
String user = jobConf.get(OozieClient.USER_NAME);
conf.set(OozieClient.USER_NAME, user);
protoConf.set(OozieClient.USER_NAME, user);
FileSystem fs = has.createFileSystem(user, uri, conf);
Path appPath = new Path(uri);
XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
Collection<String> filePaths;
if (isWorkflowJob) {
// app path could be a directory
Path path = new Path(uri.getPath());
if (!fs.isFile(path)) {
filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
} else {
filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
}
}
else {
filePaths = new LinkedHashSet<>();
}
String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
if (libPaths != null && libPaths.length > 0) {
for (String libPath : libPaths) {
if (libPath.trim().length() > 0) {
Path path = new Path(libPath.trim());
Collection<String> libFilePaths = getLibFiles(fs, path);
filePaths.addAll(libFilePaths);
}
}
}
// Check if a subworkflow should inherit the libs from the parent WF
// OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE has priority over OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE from oozie-site
// If OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE isn't specified, we use OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE
if (jobConf.getBoolean(OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE, oozieSubWfCPInheritance)) {
// Keep any libs from a parent workflow that might already be in APP_LIB_PATH_LIST and also remove duplicates
String[] parentFilePaths = jobConf.getStrings(APP_LIB_PATH_LIST);
if (parentFilePaths != null && parentFilePaths.length > 0) {
String[] filePathsNames = filePaths.toArray(new String[filePaths.size()]);
for (int i = 0; i < filePathsNames.length; i++) {
Path p = new Path(filePathsNames[i]);
filePathsNames[i] = p.getName();
}
Arrays.sort(filePathsNames);
List<String> nonDuplicateParentFilePaths = new ArrayList<>();
for (String parentFilePath : parentFilePaths) {
Path p = new Path(parentFilePath);
if (Arrays.binarySearch(filePathsNames, p.getName()) < 0) {
nonDuplicateParentFilePaths.add(parentFilePath);
}
}
filePaths.addAll(nonDuplicateParentFilePaths);
}
}
protoConf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
//Add all properties start with 'oozie.'
for (Map.Entry<String, String> entry : jobConf) {
if (entry.getKey().startsWith("oozie.")) {
String name = entry.getKey();
String value = entry.getValue();
// if property already exists, should not overwrite
if(protoConf.get(name) == null) {
protoConf.set(name, value);
}
}
}
return protoConf;
}
catch (IOException ex) {
throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
}
catch (URISyntaxException ex) {
throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
}
catch (HadoopAccessorException ex) {
throw new WorkflowException(ex);
}
catch (Exception ex) {
throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
ex.getMessage(), ex);
}
}
/**
* Parse workflow definition.
*
* @param jobConf job configuration
* @return WorkflowApp
* @throws WorkflowException thrown if the workflow application could not be parsed.
*/
public abstract WorkflowApp parseDef(Configuration jobConf) throws WorkflowException;
/**
* Parse workflow definition along with config-default.xml config
*
* @param jobConf job configuration
* @param configDefault config from config-default.xml
* @return workflow application thrown if the workflow application could not
* be parsed
* @throws WorkflowException thrown if the workflow application could not be parsed.
*/
public abstract WorkflowApp parseDef(Configuration jobConf, Configuration configDefault) throws WorkflowException;
/**
* Parse workflow definition.
* @param wfXml workflow.
* @param jobConf job configuration
* @return workflow application.
* @throws WorkflowException thrown if the workflow application could not be parsed.
*/
public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException;
/**
* Get all library paths.
*
* @param fs file system object.
* @param libPath hdfs library path.
* @return list of paths.
* @throws IOException thrown if the lib paths could not be obtained.
*/
private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
Set<String> libPaths = new LinkedHashSet<>();
if (fs.exists(libPath)) {
FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
for (FileStatus file : files) {
libPaths.add(file.getPath().toUri().toString());
}
}
else {
XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath);
}
return libPaths;
}
/*
* Filter class doing no filtering.
* We dont need define this class, but seems fs.listStatus() is not working properly without this.
* So providing this dummy no filtering Filter class.
*/
private class NoPathFilter implements PathFilter {
@Override
public boolean accept(Path path) {
return true;
}
}
/**
* Returns Oozie system libpath.
*
* @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
*/
public Path getSystemLibPath() {
return systemLibPath;
}
}