blob: b5e29d3be606213150d3e09b4b4393007bc04629 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.Map.Entry;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.hadoop.JavaActionExecutor;
import org.apache.oozie.client.rest.JsonUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.FSUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.jdom.JDOMException;
import static org.apache.oozie.util.FSUtils.isLocalFile;
public class ShareLibService implements Service, Instrumentable {
public static final String LAUNCHERJAR_LIB_RETENTION = CONF_PREFIX + "ShareLibService.temp.sharelib.retention.days";
public static final String SHARELIB_MAPPING_FILE = CONF_PREFIX + "ShareLibService.mapping.file";
public static final String SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar";
public static final String PURGE_INTERVAL = CONF_PREFIX + "ShareLibService.purge.interval";
public static final String FAIL_FAST_ON_STARTUP = CONF_PREFIX + "ShareLibService.fail.fast.on.startup";
private static final String PERMISSION_STRING = "-rwxr-xr-x";
public static final String LAUNCHER_LIB_PREFIX = "launcher_";
public static final String SHARE_LIB_PREFIX = "lib_";
private Services services;
private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>();
private Map<String, Map<Path, Configuration>> shareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
private Map<String, List<Path>> launcherLibMap = new HashMap<String, List<Path>>();
private Set<String> actionConfSet = new HashSet<String>();
// symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib
private Map<String, Map<Path, Path>> symlinkMapping = new HashMap<String, Map<Path, Path>>();
private static XLog LOG = XLog.getLog(ShareLibService.class);
private String sharelibMappingFile;
private boolean isShipLauncherEnabled = false;
public static String SHARE_LIB_CONF_PREFIX = "oozie";
private boolean shareLibLoadAttempted = false;
private String sharelibMetaFileOldTimeStamp;
private String sharelibDirOld;
FileSystem fs;
FileSystem localFs;
final long retentionTime = 1000L * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION);
@VisibleForTesting
protected static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyyMMddHHmmss");
}
};
@Override
public void init(Services services) throws ServiceException {
this.services = services;
sharelibMappingFile = ConfigurationService.get(services.getConf(), SHARELIB_MAPPING_FILE);
isShipLauncherEnabled = ConfigurationService.getBoolean(services.getConf(), SHIP_LAUNCHER_JAR);
boolean failOnfailure = ConfigurationService.getBoolean(services.getConf(), FAIL_FAST_ON_STARTUP);
Path launcherlibPath = getLauncherlibPath();
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
URI uri = launcherlibPath.toUri();
try {
fs = FileSystem.get(has.createConfiguration(uri.getAuthority()));
localFs = LocalFileSystem.get(new Configuration(false));
//cache action key sharelib conf list
cacheActionKeySharelibConfList();
updateLauncherLib();
updateShareLib();
}
catch (Throwable e) {
if (failOnfailure) {
LOG.error("Sharelib initialization fails", e);
throw new ServiceException(ErrorCode.E0104, getClass().getName(), "Sharelib initialization fails. ", e);
}
else {
// We don't want to actually fail init by throwing an Exception, so only create the ServiceException and
// log it
ServiceException se = new ServiceException(ErrorCode.E0104, getClass().getName(),
"Not able to cache sharelib. An Admin needs to install the sharelib with oozie-setup.sh and issue the "
+ "'oozie admin' CLI command to update the sharelib", e);
LOG.error(se);
}
}
Runnable purgeLibsRunnable = new Runnable() {
@Override
public void run() {
System.out.flush();
try {
// Only one server should purge sharelib
if (Services.get().get(JobsConcurrencyService.class).isLeader()) {
final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
purgeLibs(fs, LAUNCHER_LIB_PREFIX, current);
purgeLibs(fs, SHARE_LIB_PREFIX, current);
}
}
catch (IOException e) {
LOG.error("There was an issue purging the sharelib", e);
}
}
};
services.get(SchedulerService.class).schedule(purgeLibsRunnable, 10,
ConfigurationService.getInt(services.getConf(), PURGE_INTERVAL) * 60 * 60 * 24,
SchedulerService.Unit.SEC);
}
/**
* Recursively change permissions.
*
* @throws IOException Signals that an I/O exception has occurred.
*/
private void updateLauncherLib() throws IOException {
if (isShipLauncherEnabled) {
if (fs == null) {
Path launcherlibPath = getLauncherlibPath();
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
URI uri = launcherlibPath.toUri();
fs = FileSystem.get(has.createConfiguration(uri.getAuthority()));
}
Path launcherlibPath = getLauncherlibPath();
setupLauncherLibPath(fs, launcherlibPath);
recursiveChangePermissions(fs, launcherlibPath, FsPermission.valueOf(PERMISSION_STRING));
}
}
/**
* Copy launcher jars to Temp directory.
*
* @param fs the FileSystem
* @param tmpLauncherLibPath the tmp launcher lib path
* @throws IOException Signals that an I/O exception has occurred.
*/
private void setupLauncherLibPath(FileSystem fs, Path tmpLauncherLibPath) throws IOException {
ActionService actionService = Services.get().get(ActionService.class);
List<Class<?>> classes = JavaActionExecutor.getCommonLauncherClasses();
Path baseDir = new Path(tmpLauncherLibPath, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
copyJarContainingClasses(classes, fs, baseDir, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
Set<String> actionTypes = actionService.getActionTypes();
for (String key : actionTypes) {
ActionExecutor executor = actionService.getExecutor(key);
if (executor instanceof JavaActionExecutor) {
JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
classes = jexecutor.getLauncherClasses();
if (classes != null) {
String type = executor.getType();
Path executorDir = new Path(tmpLauncherLibPath, type);
copyJarContainingClasses(classes, fs, executorDir, type);
}
}
}
}
/**
* Recursive change permissions.
*
* @param fs the FileSystem
* @param path the Path
* @param fsPerm is permission
* @throws IOException Signals that an I/O exception has occurred.
*/
private void recursiveChangePermissions(FileSystem fs, Path path, FsPermission fsPerm) throws IOException {
fs.setPermission(path, fsPerm);
FileStatus[] filesStatus = fs.listStatus(path);
for (int i = 0; i < filesStatus.length; i++) {
Path p = filesStatus[i].getPath();
if (filesStatus[i].isDirectory()) {
recursiveChangePermissions(fs, p, fsPerm);
}
else {
fs.setPermission(p, fsPerm);
}
}
}
/**
* Copy jar containing classes.
*
* @param classes the classes
* @param fs the FileSystem
* @param executorDir is Path
* @param type is sharelib key
* @throws IOException Signals that an I/O exception has occurred.
*/
private void copyJarContainingClasses(List<Class<?>> classes, FileSystem fs, Path executorDir, String type)
throws IOException {
fs.mkdirs(executorDir);
Set<String> localJarSet = new HashSet<String>();
for (Class<?> c : classes) {
String localJar = findContainingJar(c);
if (localJar != null) {
localJarSet.add(localJar);
}
else {
throw new IOException("No jar containing " + c + " found");
}
}
List<Path> listOfPaths = new ArrayList<Path>();
for (String localJarStr : localJarSet) {
File localJar = new File(localJarStr);
copyFromLocalFile(localJar, fs, executorDir);
Path path = new Path(executorDir, localJar.getName());
listOfPaths.add(path);
LOG.info(localJar.getName() + " uploaded to " + executorDir.toString());
}
launcherLibMap.put(type, listOfPaths);
}
private static boolean copyFromLocalFile(File src, FileSystem dstFS, Path dstDir) throws IOException {
Path dst = new Path(dstDir, src.getName());
InputStream in=null;
OutputStream out = null;
try {
in = new FileInputStream(src);
out = dstFS.create(dst, true);
IOUtils.copyBytes(in, out, dstFS.getConf(), true);
} catch (IOException e) {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
throw e;
}
return true;
}
/**
* Gets the path recursively.
*
* @param fs the FileSystem
* @param rootDir the root directory
* @param listOfPaths the list of paths
* @param shareLibKey the share lib key
* @return the path recursively
* @throws IOException Signals that an I/O exception has occurred.
*/
private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths, String shareLibKey,
Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
if (rootDir == null) {
return;
}
try {
if (fs.isFile(new Path(new URI(rootDir.toString()).getPath()))) {
Path filePath = new Path(new URI(rootDir.toString()).getPath());
Path qualifiedRootDirPath = fs.makeQualified(rootDir);
if (isFilePartOfConfList(rootDir)) {
cachePropertyFile(qualifiedRootDirPath, filePath, shareLibKey, shareLibConfigMap);
}
listOfPaths.add(qualifiedRootDirPath);
return;
}
FileStatus[] status = fs.listStatus(rootDir);
if (status == null) {
LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache");
return;
}
for (FileStatus file : status) {
if (file.isDirectory()) {
getPathRecursively(fs, file.getPath(), listOfPaths, shareLibKey, shareLibConfigMap);
}
else {
if (isFilePartOfConfList(file.getPath())) {
cachePropertyFile(file.getPath(), file.getPath(), shareLibKey, shareLibConfigMap);
}
listOfPaths.add(file.getPath());
}
}
}
catch (URISyntaxException e) {
throw new IOException(e);
}
catch (JDOMException e) {
throw new IOException(e);
}
}
public Map<String, List<Path>> getShareLib() {
return shareLibMap;
}
private Map<String, Map<Path, Path>> getSymlinkMapping() {
return symlinkMapping;
}
/**
* Gets the action sharelib lib jars.
*
* @param shareLibKey the sharelib key
* @return List of paths
* @throws IOException Signals that an I/O exception has occurred.
*/
public List<Path> getShareLibJars(String shareLibKey) throws IOException {
// Sharelib map is empty means that on previous or startup attempt of
// caching sharelib has failed.Trying to reload
if (shareLibMap.isEmpty() && !shareLibLoadAttempted) {
synchronized (ShareLibService.class) {
if (shareLibMap.isEmpty()) {
updateShareLib();
shareLibLoadAttempted = true;
}
}
}
checkSymlink(shareLibKey);
return shareLibMap.get(shareLibKey);
}
private void checkSymlink(final String shareLibKey) throws IOException {
if (symlinkMapping.get(shareLibKey) == null || symlinkMapping.get(shareLibKey).isEmpty()) {
return;
}
for (final Path symlinkPath : symlinkMapping.get(shareLibKey).keySet()) {
final FileSystem fileSystem = getHostFileSystem(symlinkPath);
final Path symLinkTarget = FSUtils.getSymLinkTarget(fileSystem, symlinkPath);
final boolean symlinkIsNotTarget = !getSymlinkSharelibPath(shareLibKey, symlinkPath).equals(symLinkTarget);
if (symlinkIsNotTarget) {
synchronized (ShareLibService.class) {
final Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap);
final Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<>(shareLibConfigMap);
final Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(
symlinkMapping);
LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]",
shareLibKey, symlinkPath, symLinkTarget));
loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile,
shareLibKey);
shareLibMap = tmpShareLibMap;
symlinkMapping = tmpSymlinkMapping;
shareLibConfigMap = tmpShareLibConfigMap;
return;
}
}
}
}
private Path getSymlinkSharelibPath(String shareLibKey, Path path) {
return symlinkMapping.get(shareLibKey).get(path);
}
private FileSystem getHostFileSystem(String pathStr) {
FileSystem fileSystem;
if (isLocalFile(pathStr)) {
fileSystem = localFs;
}
else {
fileSystem = fs;
}
return fileSystem;
}
private FileSystem getHostFileSystem(Path path) {
return getHostFileSystem(path.toString());
}
/**
* Gets the launcher jars.
*
* @param shareLibKey the shareLib key
* @return launcher jars paths
* @throws IOException Signals that an I/O exception has occurred.
*/
public List<Path> getSystemLibJars(String shareLibKey) throws IOException {
List<Path> returnList = new ArrayList<Path>();
// Sharelib map is empty means that on previous or startup attempt of
// caching launcher jars has failed.Trying to reload
if (isShipLauncherEnabled) {
if (launcherLibMap.isEmpty()) {
synchronized (ShareLibService.class) {
if (launcherLibMap.isEmpty()) {
updateLauncherLib();
}
}
}
if (launcherLibMap.get(shareLibKey) != null) {
returnList.addAll(launcherLibMap.get(shareLibKey));
}
}
if (shareLibKey.equals(JavaActionExecutor.OOZIE_COMMON_LIBDIR)) {
List<Path> sharelibList = getShareLibJars(shareLibKey);
if (sharelibList != null) {
returnList.addAll(sharelibList);
}
}
return returnList;
}
/**
* Find containing jar containing.
*
* @param clazz the clazz
* @return the string
*/
@VisibleForTesting
protected String findContainingJar(Class<?> clazz) {
ClassLoader loader = clazz.getClassLoader();
String classFile = clazz.getName().replaceAll("\\.", "/") + ".class";
try {
for (Enumeration<URL> itr = loader.getResources(classFile); itr.hasMoreElements();) {
URL url = itr.nextElement();
if ("jar".equals(url.getProtocol())) {
String toReturn = url.getPath();
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length());
// URLDecoder is a misnamed class, since it actually
// decodes
// x-www-form-urlencoded MIME type rather than actual
// URL encoding (which the file path has). Therefore it
// would
// decode +s to ' 's which is incorrect (spaces are
// actually
// either unencoded or encoded as "%20"). Replace +s
// first, so
// that they are kept sacred during the decoding
// process.
toReturn = toReturn.replaceAll("\\+", "%2B");
toReturn = URLDecoder.decode(toReturn, StandardCharsets.UTF_8.name());
toReturn = toReturn.replaceAll("!.*$", "");
return toReturn;
}
}
}
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
return null;
}
/**
* Purge libs.
*
* @param fs the fs
* @param prefix the prefix
* @param current the current time
* @throws IOException Signals that an I/O exception has occurred.
*/
private void purgeLibs(FileSystem fs, final String prefix, final Date current) throws IOException {
Path executorLibBasePath = services.get(WorkflowAppService.class).getSystemLibPath();
PathFilter directoryFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
if (path.getName().startsWith(prefix)) {
String name = path.getName();
String time = name.substring(prefix.length());
Date d = null;
try {
d = dt.get().parse(time);
}
catch (ParseException e) {
return false;
}
return (current.getTime() - d.getTime()) > retentionTime;
}
else {
return false;
}
}
};
FileStatus[] dirList = fs.listStatus(executorLibBasePath, directoryFilter);
Arrays.sort(dirList, new Comparator<FileStatus>() {
// sort in desc order
@Override
public int compare(FileStatus o1, FileStatus o2) {
return o2.getPath().getName().compareTo(o1.getPath().getName());
}
});
// Logic is to keep all share-lib between current timestamp and 7days old + 1 latest sharelib older than 7 days.
// refer OOZIE-1761
for (int i = 1; i < dirList.length; i++) {
Path dirPath = dirList[i].getPath();
fs.delete(dirPath, true);
LOG.info("Deleted old launcher jar lib directory {0}", dirPath.getName());
}
}
@Override
public void destroy() {
shareLibMap.clear();
launcherLibMap.clear();
}
@Override
public Class<? extends Service> getInterface() {
return ShareLibService.class;
}
/**
* Update share lib cache.
*
* @return the map
* @throws IOException Signals that an I/O exception has occurred.
*/
public Map<String, String> updateShareLib() throws IOException {
Map<String, String> status = new HashMap<String, String>();
if (fs == null) {
Path launcherlibPath = getLauncherlibPath();
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
URI uri = launcherlibPath.toUri();
fs = FileSystem.get(has.createConfiguration(uri.getAuthority()));
}
Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>();
Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>();
Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
String trimmedSharelibMappingFile = sharelibMappingFile.trim();
if (!StringUtils.isEmpty(trimmedSharelibMappingFile)) {
FileSystem fileSystem = getHostFileSystem(trimmedSharelibMappingFile);
String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822(
new Date(fileSystem.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT");
loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null);
status.put("sharelibMetaFile", sharelibMappingFile);
status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp);
status.put("sharelibMetaFileOldTimeStamp", sharelibMetaFileOldTimeStamp);
sharelibMetaFileOldTimeStamp = sharelibMetaFileNewTimeStamp;
}
else {
Path shareLibpath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
SHARE_LIB_PREFIX);
loadShareLibfromDFS(tempShareLibMap, shareLibpath, tmpShareLibConfigMap);
if (shareLibpath != null) {
status.put("sharelibDirNew", shareLibpath.toString());
status.put("sharelibDirOld", sharelibDirOld);
sharelibDirOld = shareLibpath.toString();
}
}
shareLibMap = tempShareLibMap;
symlinkMapping = tmpSymlinkMapping;
shareLibConfigMap = tmpShareLibConfigMap;
return status;
}
/**
* Get the latest share lib root path
*
* @return share lib root Path
* @throws IOException Signals that the Oozie share lib root path could not be reached.
*/
public Path getShareLibRootPath() throws IOException {
Path shareLibpath = getLatestLibPath(Services.get().get(WorkflowAppService.class).getSystemLibPath(), SHARE_LIB_PREFIX);
if (shareLibpath == null){
LOG.info("No share lib directory found");
}
return shareLibpath;
}
/**
* Update share lib cache. Parse the share lib directory and each sub directory is a action key
*
* @param shareLibMap the share lib jar map
* @param shareLibpath the share libpath
* @throws IOException Signals that an I/O exception has occurred.
*/
private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath,
Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
if (shareLibpath == null) {
LOG.info("No share lib directory found");
return;
}
FileStatus[] dirList = fs.listStatus(shareLibpath);
if (dirList == null) {
return;
}
for (FileStatus dir : dirList) {
if (!dir.isDirectory()) {
continue;
}
List<Path> listOfPaths = new ArrayList<Path>();
getPathRecursively(fs, dir.getPath(), listOfPaths, dir.getPath().getName(), shareLibConfigMap);
shareLibMap.put(dir.getPath().getName(), listOfPaths);
LOG.info("Share lib for " + dir.getPath().getName() + ":" + listOfPaths);
}
}
/**
* Load share lib text file. Sharelib mapping files contains list of key=value. where key is the action key and
* value is the DFS location of sharelib files.
*
* @param shareLibMap the share lib jar map
* @param symlinkMapping the symlink mapping
* @param sharelibFileMapping the sharelib file mapping
* @param shareLibKey the share lib key
* @throws IOException Signals that an I/O exception has occurred.
* @parm shareLibKey the sharelib key
*/
private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, Map<String, Map<Path, Path>> symlinkMapping,
Map<String, Map<Path, Configuration>> shareLibConfigMap, String sharelibFileMapping, String shareLibKey)
throws IOException {
Path shareFileMappingPath = new Path(sharelibFileMapping);
FileSystem filesystem = getHostFileSystem(shareFileMappingPath);
Properties prop = new Properties();
prop.load(filesystem.open(new Path(sharelibFileMapping)));
for (Object keyObject : prop.keySet()) {
String key = (String) keyObject;
String mapKey = key.substring(SHARE_LIB_CONF_PREFIX.length() + 1);
if (key.toLowerCase().startsWith(SHARE_LIB_CONF_PREFIX)
&& (shareLibKey == null || shareLibKey.equals(mapKey))) {
loadSharelib(shareLibMap, symlinkMapping, shareLibConfigMap, mapKey,
((String) prop.get(key)).split(","));
}
}
}
private void loadSharelib(Map<String, List<Path>> tmpShareLibMap, Map<String, Map<Path, Path>> tmpSymlinkMapping,
Map<String, Map<Path, Configuration>> shareLibConfigMap, String shareLibKey, String pathList[])
throws IOException {
List<Path> listOfPaths = new ArrayList<Path>();
Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>();
for (String pathStr : pathList) {
Path path = new Path(pathStr);
final FileSystem fileSystem = getHostFileSystem(pathStr);
getPathRecursively(fileSystem, path, listOfPaths, shareLibKey, shareLibConfigMap);
if (FSUtils.isSymlink(fileSystem, path)) {
symlinkMappingforAction.put(path, FSUtils.getSymLinkTarget(fileSystem, path));
}
}
LOG.info("symlink for " + shareLibKey + ":" + symlinkMappingforAction);
tmpSymlinkMapping.put(shareLibKey, symlinkMappingforAction);
tmpShareLibMap.put(shareLibKey, listOfPaths);
LOG.info("Share lib for " + shareLibKey + ":" + listOfPaths);
}
/**
* Gets the launcherlib path.
*
* @return the launcherlib path
*/
private Path getLauncherlibPath() {
String formattedDate = dt.get().format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime());
Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX
+ formattedDate);
return tmpLauncherLibPath;
}
/**
* Gets the Latest lib path.
*
* @param rootDir the root dir
* @param prefix the prefix
* @return latest lib path
* @throws IOException Signals that an I/O exception has occurred.
*/
public Path getLatestLibPath(Path rootDir, final String prefix) throws IOException {
Date max = new Date(0L);
Path path = null;
PathFilter directoryFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(prefix);
}
};
FileStatus[] files = fs.listStatus(rootDir, directoryFilter);
for (FileStatus file : files) {
String name = file.getPath().getName();
String time = name.substring(prefix.length());
Date d = null;
try {
d = dt.get().parse(time);
}
catch (ParseException e) {
continue;
}
if (d.compareTo(max) > 0) {
path = file.getPath();
max = d;
}
}
// If there are no timestamped directories, fall back to root directory
if (path == null) {
path = rootDir;
}
return path;
}
/**
* Instruments the log service.
* <p>
* It sets instrumentation variables indicating the location of the sharelib and launcherlib
*
* @param instr instrumentation to use.
*/
@Override
public void instrument(Instrumentation instr) {
instr.addVariable("libs", "sharelib.source", new Instrumentation.Variable<String>() {
@Override
public String getValue() {
if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
return SHARELIB_MAPPING_FILE;
}
return WorkflowAppService.SYSTEM_LIB_PATH;
}
});
instr.addVariable("libs", "sharelib.mapping.file", new Instrumentation.Variable<String>() {
@Override
public String getValue() {
if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
return sharelibMappingFile;
}
return "(none)";
}
});
instr.addVariable("libs", "sharelib.system.libpath", new Instrumentation.Variable<String>() {
@Override
public String getValue() {
String sharelibPath = "(unavailable)";
try {
Path libPath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
SHARE_LIB_PREFIX);
if (libPath != null) {
sharelibPath = libPath.toUri().toString();
}
}
catch (IOException ioe) {
// ignore exception because we're just doing instrumentation
}
return sharelibPath;
}
});
instr.addVariable("libs", "sharelib.mapping.file.timestamp", new Instrumentation.Variable<String>() {
@Override
public String getValue() {
if (!StringUtils.isEmpty(sharelibMetaFileOldTimeStamp)) {
return sharelibMetaFileOldTimeStamp;
}
return "(none)";
}
});
instr.addVariable("libs", "sharelib.keys", new Instrumentation.Variable<String>() {
@Override
public String getValue() {
Map<String, List<Path>> shareLib = getShareLib();
if (shareLib != null && !shareLib.isEmpty()) {
Set<String> keySet = shareLib.keySet();
return keySet.toString();
}
return "(unavailable)";
}
});
instr.addVariable("libs", "launcherlib.system.libpath", new Instrumentation.Variable<String>() {
@Override
public String getValue() {
return getLauncherlibPath().toUri().toString();
}
});
instr.addVariable("libs", "sharelib.symlink.mapping", new Instrumentation.Variable<String>() {
@Override
public String getValue() {
Map<String, Map<Path, Path>> shareLibSymlinkMapping = getSymlinkMapping();
if (shareLibSymlinkMapping != null && !shareLibSymlinkMapping.isEmpty()
&& shareLibSymlinkMapping.values() != null && !shareLibSymlinkMapping.values().isEmpty()) {
StringBuilder bf = new StringBuilder();
for (Entry<String, Map<Path, Path>> entry : shareLibSymlinkMapping.entrySet()) {
if (entry.getKey() != null && !entry.getValue().isEmpty()) {
for (Path path : entry.getValue().keySet()) {
bf.append(path).append("(").append(entry.getKey()).append(")").append("=>")
.append(shareLibSymlinkMapping.get(entry.getKey()) != null ? shareLibSymlinkMapping
.get(entry.getKey()).get(path) : "").append(",");
}
}
}
return bf.toString();
}
return "(none)";
}
});
instr.addVariable("libs", "sharelib.cached.config.file", new Instrumentation.Variable<String>() {
@Override
public String getValue() {
Map<String, Map<Path, Configuration>> shareLibConfigMap = getShareLibConfigMap();
if (shareLibConfigMap != null && !shareLibConfigMap.isEmpty()) {
StringBuilder bf = new StringBuilder();
for (String path : shareLibConfigMap.keySet()) {
bf.append(path).append(";");
}
return bf.toString();
}
return "(none)";
}
});
}
/**
* Returns file system for shared libraries.
* <p>
* If WorkflowAppService#getSystemLibPath doesn't have authority then a default one assumed
*
* @return file system for shared libraries
*/
public FileSystem getFileSystem() {
return fs;
}
/**
* Cache XML conf file
*
* @param propertyFilePath the path of the property file
* @param shareLibKey the share lib key
* @throws IOException Signals that an I/O exception has occurred.
* @throws JDOMException
*/
private void cachePropertyFile(Path qualifiedHdfsPath, Path propertyFilePath, String shareLibKey,
Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException {
Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey);
if (confMap == null) {
confMap = new HashMap<Path, Configuration>();
shareLibConfigMap.put(shareLibKey, confMap);
}
FileSystem fileSystem = getHostFileSystem(propertyFilePath);
Configuration xmlConf = new XConfiguration(fileSystem.open(propertyFilePath));
confMap.put(qualifiedHdfsPath, xmlConf);
}
private void cacheActionKeySharelibConfList() {
ActionService actionService = Services.get().get(ActionService.class);
Set<String> actionTypes = actionService.getActionTypes();
for (String key : actionTypes) {
ActionExecutor executor = actionService.getExecutor(key);
if (executor instanceof JavaActionExecutor) {
JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
actionConfSet.addAll(
new HashSet<String>(Arrays.asList(jexecutor.getShareLibFilesForActionConf() == null ? new String[0]
: jexecutor.getShareLibFilesForActionConf())));
}
}
}
public Configuration getShareLibConf(String inputKey, Path path) {
if (shareLibConfigMap.containsKey(inputKey)) {
return shareLibConfigMap.get(inputKey).get(path);
}
return null;
}
@VisibleForTesting
public Map<String, Map<Path, Configuration>> getShareLibConfigMap() {
return shareLibConfigMap;
}
private boolean isFilePartOfConfList(Path path) throws URISyntaxException {
String fragmentName = new URI(path.toString()).getFragment();
String fileName = fragmentName == null ? path.getName() : fragmentName;
return actionConfSet.contains(fileName);
}
}