blob: 00e777783029ae42307e9e696fe0399b8e75f061 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.ShareLibService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.oozie.action.hadoop.JavaActionExecutor.ACTION_SHARELIB_FOR;
import static org.apache.oozie.action.hadoop.JavaActionExecutor.SHARELIB_EXCLUDE_SUFFIX;
public class TestJavaActionExecutorLibAddition extends ActionExecutorTestCase {
private static final String[] TEST_SHARELIB_OOZIE_FILES = {
"jackson-core-2.3.jar",
"jackson-databind-2.3.jar",
"other-lib.jar",
"oozie-library.jar",
};
private static final String[] TEST_SHARELIB_JAVA_FILES = {
"jackson-core-2.6.5.jar",
"jackson-databind-2.6.5.jar",
"some-lib.jar",
"another-lib.jar",
};
private static final String[] TEST_SHARELIB_PIG_FILES = {
"jackson-pig-0.3.3.jar",
"jackson-datapig-0.3.5.jar",
"pig_data.txt",
};
private static final String[] TEST_SHARELIB_USER_FILES = {
"jackson-user-3.3.jar",
"jackson-workflow-app.jar",
"user-job-utils.jar",
"jackson-files.zip"
};
private static final String[] TEST_SHARELIB_FILES = {
"soFile.so",
"soFile.so.1",
"file",
"jar.jar"
};
private static final String[] TEST_SHARELIB_ROOT_FILES = {
"rootSoFile.so",
"rootSoFile.so.1",
"rootFile",
"rootJar.jar"
};
private static final String[] TEST_SHARELIB_ARCHIVES = {
"archive.tar",
"rootArchive.tar",
};
@Override
protected void setSystemProps() throws Exception {
super.setSystemProps();
setHadoopSystemProps();
createActionConfDirFiles();
}
private void createActionConfDirFiles() throws IOException {
new File(getTestCaseConfDir(), "action-conf").mkdir();
InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("test-action-config.xml");
OutputStream os = new FileOutputStream(new File(getTestCaseConfDir() + "/action-conf", "java.xml"));
IOUtils.copyStream(is, os);
}
private void setHadoopSystemProps() {
setSystemProperty("oozie.service.ActionService.executor.classes", JavaActionExecutor.class.getName());
setSystemProperty("oozie.service.HadoopAccessorService.action.configurations",
"*=hadoop-conf," + getJobTrackerUri() + "=action-conf");
setSystemProperty(WorkflowAppService.SYSTEM_LIB_PATH, getFsTestCaseDir().toUri().getPath() + "/systemlib");
}
private Map<String, Path> setupTestShareLibExcludeTestJars(final Path systemLibPath) throws Exception {
final Path oozieShareLibPath = new Path(systemLibPath, "oozie");
final Path javaShareLibPath = new Path(systemLibPath, "java");
final Path pigShareLibPath = new Path(systemLibPath, new Path("pig", "lib"));
final Path actionLibPath = new Path(getAppPath(), "lib");
makeDirs(oozieShareLibPath, javaShareLibPath, pigShareLibPath, actionLibPath);
final Map<String, Path> libs = new LinkedHashMap<>();
loadLibPathsToMap(libs, oozieShareLibPath, TEST_SHARELIB_OOZIE_FILES);
loadLibPathsToMap(libs, javaShareLibPath, TEST_SHARELIB_JAVA_FILES);
loadLibPathsToMap(libs, pigShareLibPath, TEST_SHARELIB_PIG_FILES);
loadLibPathsToMap(libs, actionLibPath, TEST_SHARELIB_USER_FILES);
createFiles(libs.values());
return libs;
}
private void loadLibPathsToMap(Map<String, Path> libMap, Path root, String... files) {
for (String f : files) {
libMap.put(f, new Path(root, f));
}
}
private void checkLibExclude(final String... libsToBeExcluded) throws Exception {
final int LIBS_ADDED_BY_TESTS = 6;
final Map<String, Path> libs = setupTestShareLibExcludeTestJars(getNewSystemLibPath());
final Path actionLibPath = new Path(getAppPath(), "lib");
final Context context = createContextUsingSharelib(actionLibPath);
createWorkflowJobUsingSharelib(context);
setSharelibForActionInConfiguration("java,pig");
final Configuration jobConf = createActionExecutorAndSetupServices(context);
final URI[] cacheFiles = DistributedCache.getCacheFiles(jobConf);
final String cacheFilesStr = getDistributedCacheFilesStr(jobConf);
for (final String lib : libsToBeExcluded) {
assertFalse(lib + " should have been excluded from distributed cache",
cacheFilesStr.contains(libs.get(lib).toString()));
}
assertEquals("The number of files on distributed cache is not what expected.",
libs.size() - libsToBeExcluded.length + LIBS_ADDED_BY_TESTS, cacheFiles.length);
}
private Context createContextUsingSharelib(final Path actionLibPath) throws Exception {
final String actionXml = String.format("<java>" +
"<job-tracker>%s</job-tracker>" +
"<name-node>%s</name-node>" +
"<configuration>" +
"<property>" +
"<name>oozie.launcher.oozie.libpath</name>" +
"<value>%s</value>" +
"</property>" +
"</configuration>" +
"<main-class>MAIN-CLASS</main-class>" +
"</java>", getJobTrackerUri(), getNameNodeUri(), actionLibPath);
return createContext(actionXml, null);
}
private void createWorkflowJobUsingSharelib(final Context context, final XConfiguration wfConf) {
final WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
wfConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
wfConf.set(OozieClient.APP_PATH, new Path(getAppPath(), "workflow.xml").toString());
wfConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true);
workflow.setConf(XmlUtils.prettyPrint(wfConf).toString());
}
private void createWorkflowJobUsingSharelib(final Context context) {
createWorkflowJobUsingSharelib(context, new XConfiguration());
}
private Configuration createActionExecutorAndSetupServices(final Context context) throws Exception {
Services.get().setService(ShareLibService.class);
final Element eActionXml = XmlUtils.parseXml(context.getAction().getConf());
final JavaActionExecutor ae = new JavaActionExecutor();
final Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
ae.setupLauncherConf(jobConf, eActionXml, getAppPath(), context);
Services.get().get(ShareLibService.class).updateShareLib();
ae.setLibFilesArchives(context, eActionXml, getAppPath(), jobConf);
return jobConf;
}
private Configuration createActionExecutorAndSetLibFilesArchives(final Context context) throws Exception {
final Element eActionXml = XmlUtils.parseXml(context.getAction().getConf());
final JavaActionExecutor ae = new JavaActionExecutor();
final Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
ae.setupLauncherConf(jobConf, eActionXml, getAppPath(), context);
ae.setLibFilesArchives(context, eActionXml, getAppPath(), jobConf);
return jobConf;
}
private void setExcludePatternInConfiguration(String pattern) {
ConfigurationService.set(ACTION_SHARELIB_FOR + "java" + SHARELIB_EXCLUDE_SUFFIX, pattern);
}
private void setSharelibForActionInConfiguration(String libs) {
ConfigurationService.set(ACTION_SHARELIB_FOR + "java", libs);
}
public void testExcludeFilesFromAllSharelibLocation() throws Exception {
setExcludePatternInConfiguration(".*jackson.*");
checkLibExclude(TEST_SHARELIB_OOZIE_FILES[0], TEST_SHARELIB_OOZIE_FILES[1],
TEST_SHARELIB_JAVA_FILES[0], TEST_SHARELIB_JAVA_FILES[1],
TEST_SHARELIB_PIG_FILES[0], TEST_SHARELIB_PIG_FILES[1],
TEST_SHARELIB_USER_FILES[0], TEST_SHARELIB_USER_FILES[1], TEST_SHARELIB_USER_FILES[3]);
}
public void testExcludeFilesFromAllOozieSharelibFolder() throws Exception {
setExcludePatternInConfiguration("oozie/jackson.*");
checkLibExclude(TEST_SHARELIB_OOZIE_FILES[0], TEST_SHARELIB_OOZIE_FILES[1]);
}
public void testExcludeFilesFromMultipleLocations() throws Exception {
setExcludePatternInConfiguration("pig/lib/jackson.*|java/jackson.*|oozie/jackson.*");
checkLibExclude(TEST_SHARELIB_OOZIE_FILES[0], TEST_SHARELIB_OOZIE_FILES[1],
TEST_SHARELIB_JAVA_FILES[0], TEST_SHARELIB_JAVA_FILES[1],
TEST_SHARELIB_PIG_FILES[0], TEST_SHARELIB_PIG_FILES[1]);
}
public void testExcludeUserProvidedFiles() throws Exception {
setExcludePatternInConfiguration(".*/app/lib/jackson.*");
checkLibExclude(TEST_SHARELIB_USER_FILES[0], TEST_SHARELIB_USER_FILES[1], TEST_SHARELIB_USER_FILES[3]);
}
public void testAddActionShareLib() throws Exception {
final Path systemLibPath = getNewSystemLibPath();
final Path actionLibPath = new Path(getAppPath(), "lib");
final Path javaShareLibPath = new Path(systemLibPath, "java");
final Path jar1Path = new Path(javaShareLibPath, "jar1.jar");
final Path jar2Path = new Path(javaShareLibPath, "jar2.jar");
final Path hcatShareLibPath = new Path(systemLibPath, "hcat");
final Path jar3Path = new Path(hcatShareLibPath, "jar3.jar");
final Path jar4Path = new Path(hcatShareLibPath, "jar4.jar");
final Path otherShareLibPath = new Path(systemLibPath, "other");
final Path jar5Path = new Path(otherShareLibPath, "jar5.jar");
makeDirs(javaShareLibPath, hcatShareLibPath, otherShareLibPath);
createFiles(Arrays.asList(jar1Path, jar2Path, jar3Path, jar4Path, jar5Path));
final Context context = createContextUsingSharelib(actionLibPath);
createWorkflowJobUsingSharelib(context);
setSharelibForActionInConfiguration("java,hcat");
try {
createActionExecutorAndSetupServices(context);
fail("Expected ActionExecutorException to be thrown, but got nothing.");
}
catch (ActionExecutorException aee) {
assertEquals("Unexpected error code. Message: " + aee.getMessage(), "EJ001", aee.getErrorCode());
assertEquals("Unexpected error message","Could not locate Oozie sharelib", aee.getMessage());
}
final Path launcherPath = new Path(systemLibPath, "oozie");
final Path jar6Path = new Path(launcherPath, "jar6.jar");
makeDirs(launcherPath);
getFileSystem().create(jar6Path).close();
Configuration jobConf = createActionExecutorAndSetupServices(context);
String cacheFilesStr = getDistributedCacheFilesStr(jobConf);
assertContainsJars( cacheFilesStr, Arrays.asList(jar1Path, jar2Path, jar3Path, jar4Path, jar6Path));
assertNotContainsJars( cacheFilesStr, Collections.singletonList(jar5Path));
final XConfiguration wfConf = new XConfiguration();
wfConf.set(ACTION_SHARELIB_FOR + "java", "other,hcat");
createWorkflowJobUsingSharelib(context, wfConf);
setSharelibForActionInConfiguration("java");
jobConf = createActionExecutorAndSetupServices(context);
// The oozie server setting should have been overridden by workflow setting
cacheFilesStr = getDistributedCacheFilesStr(jobConf);
assertContainsJars(cacheFilesStr, Arrays.asList(jar3Path, jar4Path, jar5Path, jar6Path));
assertNotContainsJars(cacheFilesStr, Arrays.asList(jar1Path, jar2Path));
}
private Path getActionLibPath() throws Exception {
Path actionLibPath = new Path(getFsTestCaseDir(), "actionlibs");
makeDirs(actionLibPath);
return actionLibPath;
}
private List<Path> createTestActionLibPaths(Path... paths) throws Exception{
final Path actionLibPath = new Path(getFsTestCaseDir(), "actionlibs");
makeDirs(actionLibPath);
createFiles(Arrays.asList(paths));
return Arrays.asList(paths);
}
public void testAddingActionLibDir() throws Exception{
makeDirs(getActionLibPath());
List<Path> expectedJars = createTestActionLibPaths(
new Path(getActionLibPath(), "jar1.jar"),
new Path(getActionLibPath(), "jar2.jar"));
Context context = createContextUsingSharelib(getActionLibPath());
Configuration jobConf = createActionExecutorAndSetLibFilesArchives(context);
assertContainsJars(getDistributedCacheFilesStr(jobConf), expectedJars);
}
public void testAddingActionLibFile() throws Exception{
List<Path> expectedJars = createTestActionLibPaths(new Path(getFsTestCaseDir(), "jar3.jar"));
String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" + "<configuration>" +
"<property><name>oozie.launcher.oozie.libpath</name><value>" + expectedJars.get(0) + "</value></property>" +
"</configuration>" + "<main-class>MAIN-CLASS</main-class>" +
"</java>";
Context context = createContext(actionXml, null);
Configuration jobConf = createActionExecutorAndSetLibFilesArchives(context);
assertContainsJars(getDistributedCacheFilesStr(jobConf), expectedJars);
}
public void testActionLibFileAndDir() throws Exception {
makeDirs(getActionLibPath());
List<Path> expectedJars = createTestActionLibPaths(
new Path(getActionLibPath(), "jar1.jar"),
new Path(getActionLibPath(), "jar2.jar"),
new Path(getFsTestCaseDir(), "jar3.jar"));
String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" + "<configuration>" +
"<property><name>oozie.launcher.oozie.libpath</name><value>" + getActionLibPath() + "," + expectedJars.get(2) +
"</value></property>" +
"</configuration>" + "<main-class>MAIN-CLASS</main-class>" +
"</java>";
Context context = createContext(actionXml, null);
Configuration jobConf = createActionExecutorAndSetLibFilesArchives(context);
assertContainsJars(getDistributedCacheFilesStr(jobConf), expectedJars);
}
private String getDistributedCacheFilesStr(final Configuration jobConf) throws IOException {
return Arrays.toString(DistributedCache.getCacheFiles(jobConf));
}
private Map<String, Path> createAndGetSharelibTestFiles() throws Exception {
Path rootPath = new Path(getFsTestCaseDir(), "root");
Map<String, Path> libs = new LinkedHashMap<>();
loadLibPathsToMap(libs, getAppPath(), TEST_SHARELIB_FILES);
loadLibPathsToMap(libs, rootPath, TEST_SHARELIB_ROOT_FILES);
loadLibPathsToMap(libs, getAppPath(), TEST_SHARELIB_ARCHIVES[0]);
loadLibPathsToMap(libs, rootPath, TEST_SHARELIB_ARCHIVES[1]);
createFiles(libs.values());
return libs;
}
public void testLibFileArchives() throws Exception {
Map<String, Path> libs = createAndGetSharelibTestFiles();
final String actionXml = "<java>" +
" <job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
" <name-node>" + getNameNodeUri() + "</name-node>" +
" <main-class>CLASS</main-class>" +
" <file>" + libs.get(TEST_SHARELIB_FILES[0]).toString() + "</file>\n" +
" <file>" + libs.get(TEST_SHARELIB_FILES[1]).toString() + "</file>\n" +
" <file>" + libs.get(TEST_SHARELIB_FILES[2]).toString() + "</file>\n" +
" <file>" + libs.get(TEST_SHARELIB_FILES[3]).toString() + "</file>\n" +
" <file>" + libs.get(TEST_SHARELIB_ROOT_FILES[0]).toString() + "</file>\n" +
" <file>" + libs.get(TEST_SHARELIB_ROOT_FILES[1]).toString() + "</file>\n" +
" <file>" + libs.get(TEST_SHARELIB_ROOT_FILES[2]).toString() + "</file>\n" +
" <file>" + libs.get(TEST_SHARELIB_ROOT_FILES[3]).toString() + "</file>\n" +
" <archive>" + libs.get(TEST_SHARELIB_ARCHIVES[0]).toString() + "</archive>\n" +
" <archive>" + libs.get(TEST_SHARELIB_ARCHIVES[1]).toString() + "</archive>\n" +
"</java>";
final Configuration jobConf = createActionExecutorAndSetLibFilesArchives(createContext(actionXml, null));
verifyFilesInDistributedCache(libs, jobConf);
}
public void testCommaSeparatedFilesAndArchives() throws Exception {
Map<String, Path> libs = createAndGetSharelibTestFiles();
final String actionXml = "<java>" +
" <job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
" <name-node>" + getNameNodeUri() + "</name-node>" +
" <main-class>CLASS</main-class>" +
" <file>" + libs.get(TEST_SHARELIB_FILES[3]).toString() +
"," + libs.get(TEST_SHARELIB_ROOT_FILES[3]).toString() +
"," + libs.get(TEST_SHARELIB_FILES[2]).toString() +
", " + libs.get(TEST_SHARELIB_ROOT_FILES[2]).toString() + // with leading and trailing spaces
" ," + libs.get(TEST_SHARELIB_FILES[0]).toString() +
"," + libs.get(TEST_SHARELIB_ROOT_FILES[0]).toString() +
"," + libs.get(TEST_SHARELIB_FILES[1]).toString() +
"," + libs.get(TEST_SHARELIB_ROOT_FILES[1]).toString() + "</file>\n" +
" <archive>" + libs.get(TEST_SHARELIB_ARCHIVES[0]).toString() + ", "
+ libs.get(TEST_SHARELIB_ARCHIVES[1]).toString() + " </archive>\n" + // with leading and trailing spaces
"</java>";
final Configuration jobConf = createActionExecutorAndSetLibFilesArchives(createContext(actionXml, null));
verifyFilesInDistributedCache(libs, jobConf);
}
private void verifyFilesInDistributedCache(Map<String, Path> libs, final Configuration jobConf) throws Exception {
assertTrue(DistributedCache.getSymlink(jobConf));
String filesInClassPathString = Arrays.toString(DistributedCache.getFileClassPaths(jobConf));
assertContainsJars(filesInClassPathString, Arrays.asList(
libs.get(TEST_SHARELIB_FILES[3]),
libs.get(TEST_SHARELIB_ROOT_FILES[3])));
assertNotContainsJars(filesInClassPathString, Arrays.asList(
libs.get(TEST_SHARELIB_FILES[0]),
libs.get(TEST_SHARELIB_FILES[1]),
libs.get(TEST_SHARELIB_FILES[2]),
libs.get(TEST_SHARELIB_ROOT_FILES[0]),
libs.get(TEST_SHARELIB_ROOT_FILES[1]),
libs.get(TEST_SHARELIB_ROOT_FILES[2])));
filesInClassPathString = Arrays.toString(DistributedCache.getCacheFiles(jobConf));
assertContainsJars(filesInClassPathString, Arrays.asList(
libs.get(TEST_SHARELIB_FILES[0]),
libs.get(TEST_SHARELIB_FILES[1]),
libs.get(TEST_SHARELIB_FILES[2]),
libs.get(TEST_SHARELIB_FILES[3]),
libs.get(TEST_SHARELIB_ROOT_FILES[0]),
libs.get(TEST_SHARELIB_ROOT_FILES[1]),
libs.get(TEST_SHARELIB_ROOT_FILES[2]),
libs.get(TEST_SHARELIB_ROOT_FILES[3])));
filesInClassPathString = Arrays.toString(DistributedCache.getCacheArchives(jobConf));
assertContainsJars(filesInClassPathString, Arrays.asList(
libs.get(TEST_SHARELIB_ARCHIVES[0]),
libs.get(TEST_SHARELIB_ARCHIVES[1])));
}
}