blob: 24766ced2d3fcc0f25414fe055069b34d246c30c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.hadoop.ActionExecutorTestCase.Context;
import org.apache.oozie.action.hadoop.HiveActionExecutor;
import org.apache.oozie.action.hadoop.JavaActionExecutor;
import org.apache.oozie.action.hadoop.PigActionExecutor;
import org.apache.oozie.action.hadoop.TestJavaActionExecutor;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.test.XFsTestCase;
import org.apache.oozie.util.FSUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class TestShareLibService extends XFsTestCase {
private static final String HDFS_SCHEME_PREFIX = "hdfs";
private static final String TEST_HDFS_HOME = "/user/test/";
private static final String TEST_MAPPING_FILENAME = "config.properties";
private static final String TEST_HDFS_MAPPING_FILE_PATH = TEST_HDFS_HOME + TEST_MAPPING_FILENAME;
private static final String SHARELIB_PATH = "shareLibPath/";
Services services;
private static String testCaseDirPath;
SimpleDateFormat dt = new SimpleDateFormat("yyyyMMddHHmmss");
@Override
protected void setUp() throws Exception {
super.setUp();
testCaseDirPath = getTestCaseDir();
services = new Services();
setSystemProps();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
services.destroy();
}
private void setSystemProps() throws IOException {
IOUtils.createJar(new File(getTestCaseDir()), MyOozie.class.getName() + ".jar", MyOozie.class);
IOUtils.createJar(new File(getTestCaseDir()), MyPig.class.getName() + ".jar", MyPig.class);
IOUtils.createJar(new File(getTestCaseDir()), TestHive.class.getName() + ".jar", TestHive.class);
Configuration conf = getOozieConfig();
conf.set(WorkflowAppService.SYSTEM_LIB_PATH, getFsTestCaseDir() + "/share/lib");
conf.set(Services.CONF_SERVICE_CLASSES, conf.get(Services.CONF_SERVICE_CLASSES) + ","
+ DummyShareLibService.class.getName());
conf.setStrings(ActionService.CONF_ACTION_EXECUTOR_CLASSES, DummyPigActionExecutor.class.getName(),
DummyHiveActionExecutor.class.getName());
}
public static class DummyShareLibService extends ShareLibService {
@Override
public String findContainingJar(Class<?> clazz) {
if (JavaActionExecutor.getCommonLauncherClasses().contains(clazz)) {
return testCaseDirPath + Path.SEPARATOR + MyOozie.class.getName() + ".jar";
}
return testCaseDirPath + Path.SEPARATOR + clazz.getName() + ".jar";
}
}
public static class DummyPigActionExecutor extends PigActionExecutor {
public DummyPigActionExecutor() {
}
@Override
public List<Class<?>> getLauncherClasses() {
return Arrays.asList(MyPig.class);
}
}
public static class DummyHiveActionExecutor extends HiveActionExecutor {
public DummyHiveActionExecutor() {
}
@Override
public List<Class<?>> getLauncherClasses() {
return Arrays.asList(TestHive.class);
}
}
static class MyOozie {
}
static class MyPig {
}
static class TestHive {
}
@Test
public void testfailFast() throws Exception {
Configuration conf = getOozieConfig();
conf.set(ShareLibService.FAIL_FAST_ON_STARTUP, "true");
// Set dummyfile as metafile which doesn't exist.
conf.set(ShareLibService.SHARELIB_MAPPING_FILE, String.valueOf(new Date().getTime()));
try {
services.init();
fail("Should throw exception");
}
catch (Throwable e) {
assertTrue(e.getMessage().contains("E0104: Could not fully initialize service"));
}
}
@Test
public void testCreateLauncherLibPath() throws Exception {
setShipLauncherInOozieConfig();
services.init();
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
List<Path> launcherPath = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR);
assertNotNull(launcherPath);
assertTrue(getFileSystem().exists(launcherPath.get(0)));
List<Path> pigLauncherPath = shareLibService.getSystemLibJars("pig");
assertTrue(getFileSystem().exists(pigLauncherPath.get(0)));
}
@Test
public void testAddShareLibDistributedCache() throws Exception {
setShipLauncherInOozieConfig();
services.init();
String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + "</java>";
Element eActionXml = XmlUtils.parseXml(actionXml);
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
WorkflowJobBean wfj = new WorkflowJobBean();
wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString());
wfj.setConf(XmlUtils.prettyPrint(new XConfiguration()).toString());
Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean());
PigActionExecutor ae = new PigActionExecutor();
Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
verifyFilesInDistributedCache(DistributedCache.getCacheFiles(jobConf), MyPig.class.getName() + ".jar",
MyOozie.class.getName() + ".jar");
}
@Test
public void testAddShareLib_pig() throws Exception {
setShipLauncherInOozieConfig();
services.init();
String actionXml = "<pig>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + "</pig>";
Element eActionXml = XmlUtils.parseXml(actionXml);
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
WorkflowJobBean wfj = new WorkflowJobBean();
wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString());
wfj.setConf(XmlUtils.prettyPrint(new XConfiguration()).toString());
Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean());
PigActionExecutor ae = new PigActionExecutor();
Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
verifyFilesInDistributedCache(DistributedCache.getCacheFiles(jobConf), "MyPig.jar", "MyOozie.jar");
}
@Test
public void testAddShareLib_pig_withVersion() throws Exception {
setShipLauncherInOozieConfig();
FileSystem fs = getFileSystem();
Date time = new Date(System.currentTimeMillis());
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
fs.mkdirs(libpath);
Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
Path pigPath1 = new Path(libpath.toString() + Path.SEPARATOR + "pig_9");
Path pigPath2 = new Path(libpath.toString() + Path.SEPARATOR + "pig_10");
fs.mkdirs(pigPath);
fs.mkdirs(pigPath1);
fs.mkdirs(pigPath2);
createFiles(libpath.toString() + Path.SEPARATOR + "pig_10" + Path.SEPARATOR + "pig-10.jar");
services.init();
String actionXml = "<pig>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ "<property><name>oozie.action.sharelib.for.pig</name><value>pig_10</value></property>" + "</pig>";
Element eActionXml = XmlUtils.parseXml(actionXml);
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
WorkflowJobBean wfj = new WorkflowJobBean();
protoConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true);
wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString());
wfj.setConf(XmlUtils.prettyPrint(protoConf).toString());
Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean());
PigActionExecutor ae = new PigActionExecutor();
Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
jobConf.set("oozie.action.sharelib.for.pig", "pig_10");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
verifyFilesInDistributedCache(DistributedCache.getCacheFiles(jobConf), "MyPig.jar", "MyOozie.jar",
"pig-10.jar");
}
// retentionTime overflows to negative before OOZIE-3142
@Test
public void testRetentionOverflow() throws Exception {
getOozieConfig().set(ShareLibService.LAUNCHERJAR_LIB_RETENTION, "25");
services.init();
ShareLibService shareLibService = services.get(ShareLibService.class);
assertTrue(shareLibService.retentionTime > 0);
}
@Test
public void testPurgeShareLib() throws Exception {
setShipLauncherInOozieConfig();
FileSystem fs = getFileSystem();
long expiryTime = System.currentTimeMillis()
- TimeUnit.MILLISECONDS.convert(
getOozieConfig()
.getInt(ShareLibService.LAUNCHERJAR_LIB_RETENTION, 7), TimeUnit.DAYS);
// for directory created 8 days back to be deleted
String expireTs = dt.format(new Date(expiryTime - TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)));
// for directory created 6 days back NOT to be deleted
String noexpireTs = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)));
// for directory created 5 days back NOT to be deleted
String noexpireTs1 = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS)));
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
Path expirePath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + expireTs);
Path noexpirePath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + noexpireTs);
Path noexpirePath1 = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + noexpireTs1);
createDirs(fs, expirePath, noexpirePath, noexpirePath1);
services.init();
assertEquals(4, fs.listStatus(basePath).length);
assertTrue(fs.exists(noexpirePath));
assertTrue(fs.exists(noexpirePath1));
assertTrue(fs.exists(expirePath));
}
@Test
public void testPurgeLauncherJar() throws Exception {
setShipLauncherInOozieConfig();
FileSystem fs = getFileSystem();
long expiryTime = System.currentTimeMillis()
- TimeUnit.MILLISECONDS.convert(
getOozieConfig()
.getInt(ShareLibService.LAUNCHERJAR_LIB_RETENTION, 7), TimeUnit.DAYS);
// for directory created 8 days back to be deleted
String expireTs = dt.format(new Date(expiryTime - TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)));
// for directory created 6 days back NOT to be deleted
String noexpireTs = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)));
// for directory created 5 days back NOT to be deleted
String noexpireTs1 = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS)));
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
Path expirePath = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + expireTs);
Path noexpirePath = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + noexpireTs);
Path noexpirePath1 = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + noexpireTs1);
createDirs(fs, expirePath, noexpirePath, noexpirePath1);
services.init();
assertEquals(4, fs.listStatus(basePath).length);
assertTrue(fs.exists(noexpirePath));
assertTrue(fs.exists(noexpirePath1));
assertTrue(fs.exists(expirePath));
}
// Logic is to keep all share-lib between current timestamp and 7days old + 1 latest sharelib older than 7 days.
// refer OOZIE-1761
@Test
public void testPurgeJar() throws Exception {
setShipLauncherInOozieConfig();
final FileSystem fs = getFileSystem();
// for directory created 8 days back to be deleted
long expiryTime = System.currentTimeMillis()
- TimeUnit.MILLISECONDS.convert(
getOozieConfig()
.getInt(ShareLibService.LAUNCHERJAR_LIB_RETENTION, 7), TimeUnit.DAYS);
String expireTs = dt.format(new Date(expiryTime - TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)));
String expireTs1 = dt.format(new Date(expiryTime - TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS)));
String noexpireTs = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)));
String noexpireTs1 = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS)));
final Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
Path expirePath = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + expireTs);
Path expirePath1 = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + expireTs1);
Path noexpirePath = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + noexpireTs);
Path noexpirePath1 = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + noexpireTs1);
createDirs(fs, expirePath, expirePath1, noexpirePath, noexpirePath1);
services.init();
// Wait for the scheduled purge runnable to complete
waitFor(20 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return (fs.listStatus(basePath).length == 4);
}
});
assertEquals(4, fs.listStatus(basePath).length);
assertTrue(fs.exists(noexpirePath));
assertTrue(fs.exists(noexpirePath1));
assertTrue(fs.exists(expirePath));
assertFalse(fs.exists(expirePath1));
}
@Test
public void testGetShareLibCompatible() throws Exception {
FileSystem fs = getFileSystem();
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
// Use basepath if there is no timestamped directory
fs.mkdirs(basePath);
Path pigPath = new Path(basePath.toString() + Path.SEPARATOR + "pig");
fs.mkdirs(pigPath);
services.init();
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
assertNotNull(shareLibService.getShareLibJars("pig"));
}
@Test
public void testGetShareLibPath() throws Exception {
FileSystem fs = getFileSystem();
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
// Use timedstamped directory if available
Date time = new Date(System.currentTimeMillis());
Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
fs.mkdirs(libpath);
Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
Path pigPath1 = new Path(libpath.toString() + Path.SEPARATOR + "pig_9");
Path pigPath2 = new Path(libpath.toString() + Path.SEPARATOR + "pig_10");
fs.mkdirs(pigPath);
fs.mkdirs(pigPath1);
fs.mkdirs(pigPath2);
services.init();
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
assertNotNull(shareLibService.getShareLibJars("pig"));
assertNotNull(shareLibService.getShareLibJars("pig_9"));
assertNotNull(shareLibService.getShareLibJars("pig_10"));
assertNull(shareLibService.getShareLibJars("pig_11"));
}
@Test
public void testShareLib() throws Exception {
setShipLauncherInOozieConfig();
FileSystem fs = getFileSystem();
String dir1 = dt.format(new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)));
String dir2 = dt.format(new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS)));
String dir3 = dt.format(new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(3, TimeUnit.DAYS)));
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
Path path1 = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + dir1);
Path path2 = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + dir2);
Path path3 = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + dir3);
createDirs(fs, path1, path2, path3);
createFiles(path1.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig.jar");
services.init();
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
assertTrue(shareLibService.getShareLibJars("pig").get(0).getName().endsWith("pig.jar"));
}
private void validateShareLibLoadFiles(final FileSystem fs, final String schema) throws Exception {
try {
services.init();
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
verifyShareLibFromMappingFileContent(schema, shareLibService);
}
finally {
if (schema.startsWith(HDFS_SCHEME_PREFIX)) {
fs.delete(new Path(SHARELIB_PATH), true);
fs.delete(new Path("linkFile.xml"), true);
}
}
}
private void verifyShareLibFromMappingFileContent(String schema, ShareLibService shareLibService) throws IOException
{
assertTrue(shareLibService.getShareLibJars("something_new").get(0).getName().endsWith("somethingNew.jar"));
assertTrue(shareLibService.getShareLibJars("pig").get(0).getName().endsWith("pig.jar"));
assertTrue(shareLibService.getShareLibJars("directjar").get(0).getName().endsWith("direct.jar"));
assertTrue(shareLibService.getShareLibJars("linkFile").get(0).getName().endsWith("targetOfLinkFile.xml"));
List<Path> listOfPaths = shareLibService.getShareLibJars("directjar");
for (Path p : listOfPaths) {
assertTrue(p.toString().startsWith(schema));
}
}
private void setupShareLibLoadFiles(FileSystem fs, String testUserHome) throws ServiceException, IOException {
createShareLibMetaFileTestResources(fs, testUserHome);
setShipLauncherInOozieConfig();
Configuration conf = getOozieConfig();
conf.set(ShareLibService.SHARELIB_MAPPING_FILE, fs.getUri() + testUserHome + TEST_MAPPING_FILENAME);
}
@Test
public void testShareLibLoadFilesFromLocalFs() throws Exception {
final FileSystem localFs = newLocalFileSystem();
final String testUserHome = Files.createTempDir().toString() + Path.SEPARATOR;
try {
setupShareLibLoadFiles(localFs, testUserHome);
validateShareLibLoadFiles(localFs, FSUtils.FILE_SCHEME_PREFIX);
}
finally {
localFs.delete(new Path(testUserHome), true);
}
}
@Test
public void testShareLibLoadFilesFromHDFS() throws Exception {
FileSystem fs = getFileSystem();
setupShareLibLoadFiles(fs, TEST_HDFS_HOME);
validateShareLibLoadFiles(fs, HDFS_SCHEME_PREFIX);
}
@Test
public void testLoadfromDFS() throws Exception {
services.init();
FileSystem fs = getFileSystem();
Date time = new Date(System.currentTimeMillis());
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX
+ ShareLibService.dt.get().format(time));
fs.mkdirs(libpath);
Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
Path ooziePath = new Path(libpath.toString() + Path.SEPARATOR + "oozie");
Path pigPath1 = new Path(libpath.toString() + Path.SEPARATOR + "pig_9");
Path pigPath2 = new Path(libpath.toString() + Path.SEPARATOR + "pig_10");
fs.mkdirs(pigPath);
fs.mkdirs(ooziePath);
fs.mkdirs(pigPath1);
fs.mkdirs(pigPath2);
createFiles(libpath.toString() + Path.SEPARATOR + "pig_10" + Path.SEPARATOR + "pig-10.jar");
createFiles(libpath.toString() + Path.SEPARATOR + "oozie" + Path.SEPARATOR + "oozie_luncher.jar");
String actionXml = "<pig>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ "<property><name>oozie.action.sharelib.for.pig</name><value>pig_10</value></property>" + "</pig>";
Element eActionXml = XmlUtils.parseXml(actionXml);
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
WorkflowJobBean wfj = new WorkflowJobBean();
protoConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true);
wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString());
wfj.setConf(XmlUtils.prettyPrint(protoConf).toString());
Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean());
PigActionExecutor ae = new PigActionExecutor();
Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
jobConf.set("oozie.action.sharelib.for.pig", "pig_10");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
verifyFilesInDistributedCache(DistributedCache.getCacheFiles(jobConf), "pig-10.jar", "oozie_luncher.jar");
}
@Test
public void testShareLibLoadFileMultipleFile() throws Exception {
FileSystem fs = getFileSystem();
createTestShareLibMetaFile_multipleFile(fs);
Configuration conf = getOozieConfig();
setShipLauncherInOozieConfig();
setShareLibMappingFileInOozieConfig(fs, conf);
services.init();
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
assertNull(shareLibService.getShareLibJars("something_new"));
assertEquals(shareLibService.getShareLibJars("pig").size(), 2);
fs.delete(new Path(SHARELIB_PATH), true);
}
private void setShareLibMappingFileInOozieConfig(FileSystem fs, Configuration conf) {
conf.set(ShareLibService.SHARELIB_MAPPING_FILE, fs.getUri() + TEST_HDFS_HOME + TEST_MAPPING_FILENAME);
}
@Test
public void testMultipleLauncherCall() throws Exception {
setShipLauncherInOozieConfig();
FileSystem fs = getFileSystem();
Date time = new Date(System.currentTimeMillis());
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX
+ ShareLibService.dt.get().format(time));
fs.mkdirs(libpath);
Path ooziePath = new Path(libpath.toString() + Path.SEPARATOR + "oozie");
fs.mkdirs(ooziePath);
createFiles(libpath.toString() + Path.SEPARATOR + "oozie" + Path.SEPARATOR + "oozie_luncher.jar");
services.init();
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
List<Path> launcherPath = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR);
assertEquals(launcherPath.size(), 2);
launcherPath = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR);
assertEquals(launcherPath.size(), 2);
}
@Test
public void testMetafileSymlink() throws ServiceException, IOException {
Configuration conf = getOozieConfig();
setShipLauncherInOozieConfig();
services.init();
FileSystem fs = getFileSystem();
Properties prop = new Properties();
try {
Path basePath = new Path(SHARELIB_PATH + "testPath");
Path basePath1 = new Path(SHARELIB_PATH + "testPath1");
Path hive_site = new Path(basePath.toString() + Path.SEPARATOR + "hive_conf" + Path.SEPARATOR
+ "hive-site.xml");
Path hive_site1 = new Path(basePath.toString() + Path.SEPARATOR + "hive_conf" + Path.SEPARATOR
+ "hive-site1.xml");
Path symlink = new Path("symlink/");
Path symlink_hive_site = new Path("symlink/hive_conf" + Path.SEPARATOR + "hive-site.xml");
fs.mkdirs(basePath);
createFiles(basePath.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig.jar");
createFiles(basePath.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig_1.jar");
createFiles(basePath1.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig_2.jar");
createFiles(basePath1.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig_3.jar");
createFiles(basePath1.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig_4.jar");
createFiles(hive_site.toString());
FSUtils.createSymlink(fs, basePath, symlink, true);
FSUtils.createSymlink(fs, hive_site, symlink_hive_site, true);
prop.put(ShareLibService.SHARE_LIB_CONF_PREFIX + ".pig", TEST_HDFS_HOME + symlink.toString());
prop.put(ShareLibService.SHARE_LIB_CONF_PREFIX + ".hive_conf", TEST_HDFS_HOME + symlink_hive_site.toString()
+ "#hive-site.xml");
createTestShareLibMappingFile(fs, prop);
assertEquals(FSUtils.isSymlink(fs, symlink), true);
setShareLibMappingFileInOozieConfig(fs, conf);
setShipLauncherInOozieConfig();
try {
ShareLibService shareLibService = Services.get().get(ShareLibService.class);
assertEquals(shareLibService.getShareLibJars("pig").size(), 2);
assertEquals(shareLibService.getShareLibJars("hive_conf").size(), 1);
FSUtils.createSymlink(fs, basePath1, symlink, true);
FSUtils.createSymlink(fs, hive_site1, symlink_hive_site, true);
assertEquals(FSUtils.getSymLinkTarget(fs, shareLibService.getShareLibJars("hive_conf").get(0)),
hive_site1);
assertEquals(shareLibService.getShareLibJars("pig").size(), 3);
}
finally {
fs.delete(symlink, true);
}
}
catch (IOException ex) {
ex.printStackTrace();
}
finally {
fs.delete(new Path(SHARELIB_PATH), true);
fs.delete(new Path(TEST_HDFS_MAPPING_FILE_PATH), true);
}
}
@Test
public void testDuplicateJarsInDistributedCache() throws Exception {
FileSystem fs = getFileSystem();
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
setShipLauncherInOozieConfig();
// Use timedstamped directory if available
Date time = new Date(System.currentTimeMillis());
Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
createDirs(fs, pigPath, new Path(pigPath, "temp"));
createFiles(new Path(pigPath, "pig.jar"));
createFiles(new Path(pigPath, "hive.jar"));
createFiles(new Path(new Path(pigPath, "temp"), "pig.jar#pig.jar"));
// DistributedCache should have only one pig jar
verifyFilesInDistributedCache(setUpPigJob(true), "pig.jar", "hive.jar", "MyOozie.jar", "MyPig.jar");
ShareLibService shareLibService = services.get(ShareLibService.class);
// sharelib service should have two jars
List<Path> shareLib = shareLibService.getShareLibJars("pig");
assertEquals(shareLib.size(), 3);
assertTrue(shareLib.toString().contains("pig.jar#pig.jar"));
assertTrue(shareLib.toString().contains("hive.jar"));
}
private URI[] setUpPigJob(boolean useSystemSharelib) throws Exception {
services.init();
String actionXml = "<pig>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node></pig>";
Element eActionXml = XmlUtils.parseXml(actionXml);
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
WorkflowJobBean wfj = new WorkflowJobBean();
protoConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, useSystemSharelib);
wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString());
wfj.setConf(XmlUtils.prettyPrint(protoConf).toString());
Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean());
PigActionExecutor ae = new PigActionExecutor();
Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
jobConf.set("oozie.action.sharelib.for.pig", "pig");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
return DistributedCache.getCacheFiles(jobConf);
}
private static void createFiles(FileSystem fs, Path... paths) throws IOException {
for (Path path : paths) {
FSDataOutputStream out = fs.create(path);
out.close();
}
}
private static void createFiles(FileSystem fs, String... filenames) throws IOException {
Path[] paths = new Path[filenames.length];
for (int i = 0; i != filenames.length; ++i) {
paths[i] = new Path(filenames[i]);
}
createFiles(fs, paths);
}
private void createFiles(String... filenames) throws IOException {
createFiles(getFileSystem(), filenames);
}
private void createFiles(Path... paths) throws IOException {
createFiles(getFileSystem(), paths);
}
private void createShareLibMetaFileTestResources(final FileSystem fs, final String testUserHome)
throws IOException {
final String testPath = testUserHome + SHARELIB_PATH;
final Path basePath = new Path(testPath + "testPath");
final Path somethingNew = new Path(testPath + "something_new");
final Path directJarDir = new Path(testPath + "directJarDir");
final String directJarPath = directJarDir.toString() + Path.SEPARATOR + "direct.jar";
final String[] testFiles = {
basePath.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig.jar" ,
somethingNew.toString() + Path.SEPARATOR + "somethingNew" + Path.SEPARATOR + "somethingNew.jar",
directJarDir.toString() + Path.SEPARATOR + "direct.jar"};
final Map<String, String> symlinks = new HashMap<>();
symlinks.put(testPath + Path.SEPARATOR + "linkDir" + Path.SEPARATOR + "targetOfLinkFile.xml",
testUserHome + "linkFile.xml");
final Properties mappingFileConfig = new Properties();
mappingFileConfig.put(ShareLibService.SHARE_LIB_CONF_PREFIX + ".pig",
TestShareLibMappingFileInput.getLocalizedShareLibPath(fs, basePath.toString()));
mappingFileConfig.put(ShareLibService.SHARE_LIB_CONF_PREFIX + ".something_new",
TestShareLibMappingFileInput.getLocalizedShareLibPath(fs, somethingNew.toString()));
mappingFileConfig.put(ShareLibService.SHARE_LIB_CONF_PREFIX + ".directjar",
TestShareLibMappingFileInput.getLocalizedShareLibPath(fs, directJarPath.toString()));
String symlink = testUserHome + "linkFile.xml";
mappingFileConfig.put(ShareLibService.SHARE_LIB_CONF_PREFIX + ".linkFile",
TestShareLibMappingFileInput.getLocalizedShareLibPath(fs, symlink + "#targetOfLinkFile.xml"));
createTestShareLibMappingFile(fs, testUserHome, Arrays.asList(testFiles), symlinks, mappingFileConfig);
}
@Test
public void testLoadMappingFilesFromDFSandLocalFs() throws IOException, ServiceException {
final String testUserHome = Files.createTempDir().toString() + Path.SEPARATOR;
final String testPath = testUserHome + SHARELIB_PATH;
final Path basePath = new Path(testPath + "testPath");
final Path somethingNew = new Path(testPath + "something_new");
final Path directHdfsJarDir = new Path(TEST_HDFS_HOME + SHARELIB_PATH + "directJarDir");
final String directHdfsJarPath = directHdfsJarDir.toString() + Path.SEPARATOR + "direct.jar";
final String directjarShareLibName = "directjar";
final FileSystem localFs = newLocalFileSystem();
final FileSystem hdFs = getFileSystem();
final TestShareLibMappingFileInput[] testShareLibMappingFileInputs = {
new TestShareLibMappingFileInput(localFs, "pig", basePath.toString() + Path.SEPARATOR + "pig"
+ Path.SEPARATOR + "pig.jar"),
new TestShareLibMappingFileInput(localFs, "something_new", somethingNew.toString()
+ Path.SEPARATOR + "something_new.jar"),
new TestShareLibMappingFileInput(hdFs, directjarShareLibName, directHdfsJarPath),
new TestShareLibMappingSymlinkInput(hdFs, "linkFile",
TEST_HDFS_HOME + "symlinkTargetDir" + Path.SEPARATOR + "targetOfLinkFile.xml",
TEST_HDFS_HOME + "linkFile.xml")
};
final Properties mappingFileConfig = new Properties();
for (final TestShareLibMappingFileInput shmfInput : testShareLibMappingFileInputs) {
shmfInput.materialize();
mappingFileConfig.put(shmfInput.sharelibNameWithMappingFilePrefix, shmfInput.getFullShareLibPathDir());
}
createTestShareLibMappingFile(testUserHome + TEST_MAPPING_FILENAME, localFs, mappingFileConfig);
setShipLauncherInOozieConfig();
final Configuration oozieConfig = getOozieConfig();
oozieConfig.set(ShareLibService.SHARELIB_MAPPING_FILE, localFs.getUri() + testUserHome + TEST_MAPPING_FILENAME);
try {
services.init();
final ShareLibService shareLibService = Services.get().get(ShareLibService.class);
for (final TestShareLibMappingFileInput sh : testShareLibMappingFileInputs) {
final String firstShareLibPath = shareLibService.getShareLibJars(sh.sharelibName).get(0).toString();
assertTrue(firstShareLibPath.endsWith(sh.baseName));
}
final List<Path> listOfPaths = shareLibService.getShareLibJars(directjarShareLibName);
for (final Path p : listOfPaths) {
assertTrue(p.toString().startsWith(HDFS_SCHEME_PREFIX));
}
}
finally {
hdFs.delete(new Path(SHARELIB_PATH), true);
hdFs.delete(new Path("linkFile.xml"), true);
localFs.delete(new Path(testUserHome), true);
}
}
private FileSystem newLocalFileSystem() throws IOException {
final Configuration emptyConfig = new Configuration(false);
return LocalFileSystem.get(emptyConfig);
}
private void setShipLauncherInOozieConfig() {
Configuration oozieConfig = getOozieConfig();
oozieConfig.set(ShareLibService.SHIP_LAUNCHER_JAR, "true");
}
private Configuration getOozieConfig() {
return services.get(ConfigurationService.class).getConf();
}
private void createTestShareLibMappingFile(final FileSystem fs,
final String testUserHome,
final Iterable<String> testFiles,
final Map<String, String> testSymlinks,
final Properties mappingConf) throws IOException {
createTestShareLibDirsAndFiles(fs, testFiles);
createTestShareLibDirsAndSymlinks(fs, testSymlinks);
createTestShareLibMappingFile(testUserHome + TEST_MAPPING_FILENAME, fs, mappingConf);
}
private void createTestShareLibDirsAndFiles(final FileSystem fs, final Iterable<String> testFiles)
throws IOException {
for (final String f : testFiles) {
createFileWithDirectoryPath(fs, f);
}
}
static void createFileWithDirectoryPath(final FileSystem fs, final String f) throws IOException {
if (!fs.exists(new Path(f))) {
String dirName = f.substring(0, f.toString().lastIndexOf(Path.SEPARATOR));
fs.mkdirs(new Path(dirName));
}
createFiles(fs, f);
}
void createTestShareLibDirsAndSymlinks(final FileSystem fs, final Map<String, String> symlinks) throws IOException {
createTestShareLibDirsAndFiles(fs, symlinks.keySet()); // create symlink targets
for (Map.Entry<String, String> symlinkEntry : symlinks.entrySet()) {
String symlinkTarget = symlinkEntry.getKey();
String symlink = symlinkEntry.getValue();
FSUtils.createSymlink(fs, new Path(symlinkTarget), new Path(symlink), true);
}
}
private void createTestShareLibMappingFile(final FileSystem fs, final Properties prop) {
createTestShareLibMappingFile(TEST_HDFS_MAPPING_FILE_PATH, fs, prop);
}
private void createTestShareLibMappingFile(final String metaFile, final FileSystem fs, final Properties prop) {
try (final FSDataOutputStream out = fs.create(new Path(metaFile))) {
prop.store(out, null);
}
catch (IOException ex) {
ex.printStackTrace();
}
}
public void createTestShareLibMetaFile_multipleFile(FileSystem fs) {
Properties prop = new Properties();
try {
Path basePath = new Path(SHARELIB_PATH + "testPath");
Path somethingNew = new Path(SHARELIB_PATH + "something_new");
fs.mkdirs(basePath);
fs.mkdirs(somethingNew);
createFiles(basePath.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig.jar");
createFiles(somethingNew.toString() + Path.SEPARATOR + "somethingNew" + Path.SEPARATOR + "somethingNew.jar");
prop.put(ShareLibService.SHARE_LIB_CONF_PREFIX + ".pig", TEST_HDFS_HOME + basePath.toString()
+ Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig.jar," + TEST_HDFS_HOME + somethingNew.toString()
+ Path.SEPARATOR + "somethingNew" + Path.SEPARATOR + "somethingNew.jar");
FSDataOutputStream out = fs.create(new Path(TEST_HDFS_MAPPING_FILE_PATH));
prop.store(out, null);
out.close();
}
catch (IOException ex) {
ex.printStackTrace();
}
}
private void createDirs(FileSystem fs, Path... paths) throws IOException {
for (Path path : paths) {
fs.mkdirs(path);
}
}
@Test
public void testConfFileAddedToActionConf() throws Exception {
try {
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
WorkflowJobBean wfj = new WorkflowJobBean();
protoConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true);
wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString());
wfj.setConf(XmlUtils.prettyPrint(protoConf).toString());
Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean());
// Test hive-site.xml in sharelib cache
setupSharelibConf("hive-site.xml", "oozie.hive_conf");
ShareLibService shareLibService = services.get(ShareLibService.class);
assertEquals(shareLibService.getShareLibConfigMap().get("hive_conf").values().size(), 1);
assertEquals(
shareLibService.getShareLibConfigMap().get("hive_conf").keySet().toArray(new Path[] {})[0]
.getName(),
"hive-site.xml");
// Test hive-site.xml not in distributed cache
setupSharelibConf("hive-site.xml", "oozie.hive_conf");
String actionXml = "<hive>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + "<script>test</script>" + "</hive>";
Element eActionXml = XmlUtils.parseXml(actionXml);
HiveActionExecutor ae = new HiveActionExecutor();
Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
Configuration actionConf = ae.createBaseHadoopConf(context, eActionXml);
jobConf.set("oozie.action.sharelib.for.hive", "hive_conf");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
URI[] cacheFiles = DistributedCache.getCacheFiles(actionConf);
String cacheFilesStr = Arrays.toString(cacheFiles);
assertFalse(cacheFilesStr.contains("hive-site.xml"));
// Test hive-site.xml property in jobconf with linkname
jobConf = ae.createBaseHadoopConf(context, eActionXml);
Properties prop = new Properties();
actionConf = ae.createBaseHadoopConf(context, eActionXml);
prop.put("oozie.hive_conf", TEST_HDFS_HOME + SHARELIB_PATH + "hive-site.xml#hive-site.xml");
setupSharelibConf("hive-site.xml", "oozie.hive_conf", prop);
jobConf.set("oozie.action.sharelib.for.hive", "hive_conf");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
assertEquals(jobConf.get("oozie.hive_conf-sharelib-test"), "test");
// Test hive-site.xml property in jobconf with linkname
// and with hdfs path
prop = new Properties();
jobConf = ae.createBaseHadoopConf(context, eActionXml);
actionConf = ae.createBaseHadoopConf(context, eActionXml);
prop.put("oozie.hive_conf", "hdfs://" + TEST_HDFS_HOME + SHARELIB_PATH + "hive-site.xml#hive-site.xml");
setupSharelibConf("hive-site.xml", "oozie.hive_conf", prop);
jobConf.set("oozie.action.sharelib.for.hive", "hive_conf");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
assertEquals(jobConf.get("oozie.hive_conf-sharelib-test"), "test");
cacheFiles = DistributedCache.getCacheFiles(actionConf);
cacheFilesStr = Arrays.toString(cacheFiles);
assertFalse(cacheFilesStr.contains("hive-site.xml"));
// Test hive-site.xml property in jobconf with non hdfs path
prop = new Properties();
jobConf = ae.createBaseHadoopConf(context, eActionXml);
actionConf = ae.createBaseHadoopConf(context, eActionXml);
prop.put("oozie.hive_conf", TEST_HDFS_HOME + SHARELIB_PATH + "hive-site.xml");
setupSharelibConf("hive-site.xml", "oozie.hive_conf", prop);
jobConf.set("oozie.action.sharelib.for.hive", "hive_conf");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
assertEquals(jobConf.get("oozie.hive_conf-sharelib-test"), "test");
cacheFiles = DistributedCache.getCacheFiles(actionConf);
cacheFilesStr = Arrays.toString(cacheFiles);
assertFalse(cacheFilesStr.contains("hive-site.xml"));
// Test hive-site.xml property in jobconf with non hdfs path with
// link name
prop = new Properties();
jobConf = ae.createBaseHadoopConf(context, eActionXml);
actionConf = ae.createBaseHadoopConf(context, eActionXml);
prop.put("oozie.hive_conf", TEST_HDFS_HOME + SHARELIB_PATH + "hive-site.xml#hive-site.xml");
setupSharelibConf("hive-site.xml", "oozie.hive_conf", prop);
jobConf.set("oozie.action.sharelib.for.hive", "hive_conf");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
assertEquals(jobConf.get("oozie.hive_conf-sharelib-test"), "test");
cacheFiles = DistributedCache.getCacheFiles(actionConf);
cacheFilesStr = Arrays.toString(cacheFiles);
assertFalse(cacheFilesStr.contains("hive-site.xml"));
}
finally {
getFileSystem().delete(new Path(SHARELIB_PATH), true);
}
}
@Test
public void testConfFileAddedToDistributedCache() throws Exception {
try {
Properties prop = new Properties();
prop.put("oozie.hive_conf", TEST_HDFS_HOME + SHARELIB_PATH + "hive-site.xml#hive-site.xml");
setupSharelibConf("hive-site.xml", "oozie.hive_conf", prop);
String actionXml = "<pig>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + "<script>test</script>" + "</pig>";
Element eActionXml = XmlUtils.parseXml(actionXml);
XConfiguration protoConf = new XConfiguration();
protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
WorkflowJobBean wfj = new WorkflowJobBean();
protoConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true);
wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString());
wfj.setConf(XmlUtils.prettyPrint(protoConf).toString());
Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean());
PigActionExecutor ae = new PigActionExecutor();
Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml);
jobConf.set("oozie.action.sharelib.for.pig", "hive_conf");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
URI[] cacheFiles = DistributedCache.getCacheFiles(jobConf);
String cacheFilesStr = Arrays.toString(cacheFiles);
assertEquals(jobConf.get("oozie.hive_conf-sharelib-test"), null);
assertTrue(URLDecoder.decode(cacheFilesStr).contains("hive-site.xml#hive-site.xml"));
setupSharelibConf("hbase-site.xml", "oozie.hbase_conf");
jobConf = ae.createBaseHadoopConf(context, eActionXml);
jobConf.set("oozie.action.sharelib.for.pig", "hbase_conf");
ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf);
cacheFiles = DistributedCache.getCacheFiles(jobConf);
cacheFilesStr = Arrays.toString(cacheFiles);
assertTrue(cacheFilesStr.contains("hbase-site.xml"));
}
finally {
getFileSystem().delete(new Path(SHARELIB_PATH), true);
}
}
@Test
public void testParsingALotOfShareLibsParallel() throws ServiceException, IOException {
setShipLauncherInOozieConfig();
services.init();
// destroying, as we dont want the sharelib dirs purge to be scheduled
services.get(SchedulerService.class).destroy();
final List<FileStatus> fileStatuses = new ArrayList<>();
final Path rootDir = Mockito.mock(Path.class);
final FileSystem fs = Mockito.mock(FileSystem.class);
final int NUMBER_OF_FILESTATUSES = 100;
for (int i = 0; i < NUMBER_OF_FILESTATUSES; ++i) {
createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 0, 1);
}
final FileStatus[] statuses = fileStatuses.toArray(new FileStatus[1]);
Mockito.when(fs.listStatus(Mockito.any(Path.class), Mockito.any(PathFilter.class))).thenReturn(statuses);
final ShareLibService shareLibService = services.get(ShareLibService.class);
shareLibService.fs = fs;
runGivenCallableOnThreads(() -> {
try {
shareLibService.getLatestLibPath(rootDir, "lib_");
} catch (final IOException | NumberFormatException e) {
log.error(e.getMessage());
Thread.currentThread().interrupt();
return false;
}
return true;
}, 10, 10);
}
@Test
public void testDeterminingLatestSharelibPathOn1Thread() throws IOException, ServiceException {
testDeterminingLatestSharelibPath(1);
}
@Test
public void testDeterminingLatestSharelibPathOn5Threads() throws IOException, ServiceException {
testDeterminingLatestSharelibPath(5);
}
@Test
public void testDeterminingLatestSharelibPathOn10Threads() throws IOException, ServiceException {
testDeterminingLatestSharelibPath(10);
}
private void testDeterminingLatestSharelibPath(final int numberOfThreads) throws ServiceException, IOException {
setShipLauncherInOozieConfig();
services.init();
// destroying, as we dont want the sharelib dirs purge to be scheduled
services.get(SchedulerService.class).destroy();
final List<FileStatus> fileStatuses = new ArrayList<>();
createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 0, 1);
createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 1, 1);
final Path filePath3 = createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 1, 0, 1);
final Path rootDir = Mockito.mock(Path.class);
final FileSystem fs = Mockito.mock(FileSystem.class);
final FileStatus[] statuses = fileStatuses.toArray(new FileStatus[1]);
Mockito.when(fs.listStatus(Mockito.any(Path.class), Mockito.any(PathFilter.class))).thenReturn(statuses);
final ShareLibService shareLibService = services.get(ShareLibService.class);
shareLibService.fs = fs;
runGivenCallableOnThreads(() -> {
try {
final Path path = shareLibService.getLatestLibPath(rootDir, "lib_");
Assert.assertEquals(filePath3, path);
} catch (final IOException | NumberFormatException e) {
log.error(e.getMessage());
Thread.currentThread().interrupt();
return false;
}
return true;
}, 100, numberOfThreads);
}
private void runGivenCallableOnThreads(
final Callable<Boolean> callable, final int numberOfCallables, final int numberOfThreads) {
final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final List<Callable<Boolean>> callableTasks = new ArrayList<>();
for (int i = 0; i < numberOfCallables; ++i) {
callableTasks.add(callable);
}
// Start 10 thread to do parallel time parsing. Issue is experienced with old code.
List<Future<Boolean>> futures = new ArrayList<>();
try {
futures = executor.invokeAll(callableTasks);
} catch (final InterruptedException e) {
log.error(e.getMessage());
Assert.fail("Determining timestamp of a share lib name is failed with: " + e.getMessage());
}
// Shut down the executor service to have all tasks finished, then collect the results
awaitTerminationAfterShutdown(executor);
Assert.assertFalse(futures.isEmpty());
for (final Future<Boolean> f : futures) {
try {
final Boolean result = f.get(5, TimeUnit.SECONDS);
Assert.assertTrue("Parsed share lib name shall be a valid timestamp", result);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
log.error(e.getMessage());
Assert.fail("Determining timestamp of a share lib name is failed with: " + e.getMessage());
}
}
}
private Path createAndAddMockedFileStatus(final List<FileStatus> fileStatuses, int y, int m, int d, int h, int min, int s) {
final String date = new SimpleDateFormat("yyyyMMddHHmmss").format(
new Calendar.Builder().setDate(y, m, d).setTimeOfDay(h, min, s).build().getTime());
final Path filePath = Mockito.mock(Path.class);
final String libName = ShareLibService.SHARE_LIB_PREFIX + date;
Mockito.when(filePath.getName()).thenReturn(libName);
final FileStatus fileStatus = Mockito.mock(FileStatus.class);
Mockito.when(fileStatus.getPath()).thenReturn(filePath);
fileStatuses.add(fileStatus);
return filePath;
}
private void awaitTerminationAfterShutdown(ExecutorService threadPool) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException ex) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
private void setupSharelibConf(final String file, final String tag) throws ServiceException, IOException {
Properties prop = new Properties();
prop.put(tag, TEST_HDFS_HOME + SHARELIB_PATH);
setupSharelibConf(file, tag, prop);
}
private void setupSharelibConf(final String file, final String tag, Properties prop) throws IOException,
ServiceException {
setupSharelibConf(getFileSystem(), file, tag, prop);
}
private void setupSharelibConf(final FileSystem fs, final String file, final String tag, Properties prop)
throws IOException, ServiceException {
Configuration conf = getOozieConfig();
setShipLauncherInOozieConfig();
setShareLibMappingFileInOozieConfig(fs, conf);
XConfiguration hiveConf = new XConfiguration();
hiveConf.set(tag + "-sharelib-test", "test");
createDirs(getFileSystem(), new Path(SHARELIB_PATH));
FSDataOutputStream out = getFileSystem().create(new Path(SHARELIB_PATH, file), true);
PrintWriter bufOut = new PrintWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
bufOut.write(hiveConf.toXmlString(false));
bufOut.close();
createTestShareLibMappingFile(getFileSystem(), prop);
services.init();
}
private void verifyFilesInDistributedCache(URI[] cacheFiles, String... files) {
String cacheFilesStr = Arrays.toString(cacheFiles);
// Hadoop 2 has the following jars too: MRAppJar.jar and hadoop-mapreduce-client-jobclient-
assertEquals(cacheFiles.length, files.length + 2);
assertTrue(cacheFilesStr.contains("MRAppJar.jar"));
assertTrue(cacheFilesStr.contains("hadoop-mapreduce-client-jobclient-"));
for (String file : files) {
assertTrue(cacheFilesStr.contains(file));
}
}
}