| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.slider.common.tools; |
| |
| import com.google.common.base.Preconditions; |
| |
| import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; |
| import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; |
| import org.apache.commons.io.output.ByteArrayOutputStream; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.GlobFilter; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.ExitUtil; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.util.VersionInfo; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.slider.Slider; |
| import org.apache.slider.api.InternalKeys; |
| import org.apache.slider.api.RoleKeys; |
| import org.apache.slider.common.SliderKeys; |
| import org.apache.slider.common.SliderXmlConfKeys; |
| import org.apache.slider.core.conf.MapOperations; |
| import org.apache.slider.core.exceptions.BadClusterStateException; |
| import org.apache.slider.core.exceptions.BadCommandArgumentsException; |
| import org.apache.slider.core.exceptions.BadConfigException; |
| import org.apache.slider.core.exceptions.ErrorStrings; |
| import org.apache.slider.core.exceptions.SliderException; |
| import org.apache.slider.core.launch.ClasspathConstructor; |
| import org.apache.slider.core.main.LauncherExitCodes; |
| import org.apache.slider.server.services.utility.EndOfServiceWaiter; |
| import org.apache.slider.server.services.utility.PatternValidator; |
| import org.apache.slider.server.services.workflow.ForkedProcessService; |
| import org.apache.zookeeper.server.util.KerberosUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.FileReader; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InterruptedIOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.net.InetSocketAddress; |
| import java.net.ServerSocket; |
| import java.net.Socket; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.net.URL; |
| import java.net.URLDecoder; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Date; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| /** |
| * These are slider-specific Util methods |
| */ |
| public final class SliderUtils { |
| |
| private static final Logger log = LoggerFactory.getLogger(SliderUtils.class); |
| |
| /** |
| * Atomic bool to track whether or not process security has already been |
| * turned on (prevents re-entrancy) |
| */ |
| private static final AtomicBoolean processSecurityAlreadyInitialized = |
| new AtomicBoolean(false); |
| public static final String JAVA_SECURITY_KRB5_REALM = |
| "java.security.krb5.realm"; |
| public static final String JAVA_SECURITY_KRB5_KDC = "java.security.krb5.kdc"; |
| |
| /** |
| * Winutils |
| */ |
| public static final String WINUTILS = "WINUTILS.EXE"; |
| /** |
| * name of openssl program |
| */ |
| public static final String OPENSSL = "openssl"; |
| |
| /** |
| * name of python program |
| */ |
| public static final String PYTHON = "python"; |
| |
| private SliderUtils() { |
| } |
| |
| /** |
| * Implementation of set-ness, groovy definition of true/false for a string |
| * @param s string |
| * @return true iff the string is neither null nor empty |
| */ |
| public static boolean isUnset(String s) { |
| return s == null || s.isEmpty(); |
| } |
| |
| public static boolean isSet(String s) { |
| return !isUnset(s); |
| } |
| |
| /* |
| * Validates whether num is an integer |
| * @param num |
| * @param msg the message to be shown in exception |
| */ |
| @SuppressWarnings("ResultOfMethodCallIgnored") |
| private static void validateNumber(String num, String msg) throws |
| BadConfigException { |
| try { |
| Integer.parseInt(num); |
| } catch (NumberFormatException nfe) { |
| throw new BadConfigException(msg + num); |
| } |
| } |
| |
| /* |
| * Translates the trailing JVM heapsize unit: g, G, m, M |
| * This assumes designated unit of 'm' |
| * @param heapsize |
| * @return heapsize in MB |
| */ |
| public static String translateTrailingHeapUnit(String heapsize) throws |
| BadConfigException { |
| String errMsg = "Bad heapsize: "; |
| if (heapsize.endsWith("m") || heapsize.endsWith("M")) { |
| String num = heapsize.substring(0, heapsize.length() - 1); |
| validateNumber(num, errMsg); |
| return num; |
| } |
| if (heapsize.endsWith("g") || heapsize.endsWith("G")) { |
| String num = heapsize.substring(0, heapsize.length() - 1) + "000"; |
| validateNumber(num, errMsg); |
| return num; |
| } |
| // check if specified heap size is a number |
| validateNumber(heapsize, errMsg); |
| return heapsize; |
| } |
| |
| /** |
| * recursive directory delete |
| * @param dir dir to delete |
| * @throws IOException on any problem |
| */ |
| public static void deleteDirectoryTree(File dir) throws IOException { |
| if (dir.exists()) { |
| if (dir.isDirectory()) { |
| log.info("Cleaning up {}", dir); |
| //delete the children |
| File[] files = dir.listFiles(); |
| if (files == null) { |
| throw new IOException("listfiles() failed for " + dir); |
| } |
| for (File file : files) { |
| log.info("deleting {}", file); |
| if (!file.delete()) { |
| log.warn("Unable to delete " + file); |
| } |
| } |
| if (!dir.delete()) { |
| log.warn("Unable to delete " + dir); |
| } |
| } else { |
| throw new IOException("Not a directory " + dir); |
| } |
| } else { |
| //not found, do nothing |
| log.debug("No output dir yet"); |
| } |
| } |
| |
| /** |
| * Find a containing JAR |
| * @param clazz class to find |
| * @return the file |
| * @throws IOException any IO problem, including the class not having a |
| * classloader |
| * @throws FileNotFoundException if the class did not resolve to a file |
| */ |
| public static File findContainingJarOrFail(Class clazz) throws IOException { |
| File localFile = SliderUtils.findContainingJar(clazz); |
| if (null == localFile) { |
| throw new FileNotFoundException("Could not find JAR containing " + clazz); |
| } |
| return localFile; |
| } |
| |
| |
| /** |
| * Find a containing JAR |
| * @param my_class class to find |
| * @return the file or null if it is not found |
| * @throws IOException any IO problem, including the class not having a |
| * classloader |
| */ |
| public static File findContainingJar(Class my_class) throws IOException { |
| ClassLoader loader = my_class.getClassLoader(); |
| if (loader == null) { |
| throw new IOException( |
| "Class " + my_class + " does not have a classloader!"); |
| } |
| String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; |
| Enumeration<URL> urlEnumeration = loader.getResources(class_file); |
| if (urlEnumeration == null) { |
| throw new IOException("Unable to find resources for class " + my_class); |
| } |
| |
| for (; urlEnumeration.hasMoreElements(); ) { |
| URL url = urlEnumeration.nextElement(); |
| if ("jar".equals(url.getProtocol())) { |
| String toReturn = url.getPath(); |
| if (toReturn.startsWith("file:")) { |
| toReturn = toReturn.substring("file:".length()); |
| } |
| // URLDecoder is a misnamed class, since it actually decodes |
| // x-www-form-urlencoded MIME type rather than actual |
| // URL encoding (which the file path has). Therefore it would |
| // decode +s to ' 's which is incorrect (spaces are actually |
| // either unencoded or encoded as "%20"). Replace +s first, so |
| // that they are kept sacred during the decoding process. |
| toReturn = toReturn.replaceAll("\\+", "%2B"); |
| toReturn = URLDecoder.decode(toReturn, "UTF-8"); |
| String jarFilePath = toReturn.replaceAll("!.*$", ""); |
| return new File(jarFilePath); |
| } else { |
| log.info("could not locate JAR containing {} URL={}", my_class, url); |
| } |
| } |
| return null; |
| } |
| |
| public static void checkPort(String hostname, int port, int connectTimeout) |
| throws IOException { |
| InetSocketAddress addr = new InetSocketAddress(hostname, port); |
| checkPort(hostname, addr, connectTimeout); |
| } |
| |
| @SuppressWarnings("SocketOpenedButNotSafelyClosed") |
| public static void checkPort(String name, |
| InetSocketAddress address, |
| int connectTimeout) |
| throws IOException { |
| Socket socket = null; |
| try { |
| socket = new Socket(); |
| socket.connect(address, connectTimeout); |
| } catch (Exception e) { |
| throw new IOException("Failed to connect to " + name |
| + " at " + address |
| + " after " + connectTimeout + "millisconds" |
| + ": " + e, |
| e); |
| } finally { |
| IOUtils.closeSocket(socket); |
| } |
| } |
| |
| public static void checkURL(String name, String url, int timeout) throws |
| IOException { |
| InetSocketAddress address = NetUtils.createSocketAddr(url); |
| checkPort(name, address, timeout); |
| } |
| |
| /** |
| * A required file |
| * @param role role of the file (for errors) |
| * @param filename the filename |
| * @throws ExitUtil.ExitException if the file is missing |
| * @return the file |
| */ |
| public static File requiredFile(String filename, String role) throws |
| IOException { |
| if (filename.isEmpty()) { |
| throw new ExitUtil.ExitException(-1, role + " file not defined"); |
| } |
| File file = new File(filename); |
| if (!file.exists()) { |
| throw new ExitUtil.ExitException(-1, |
| role + " file not found: " + |
| file.getCanonicalPath()); |
| } |
| return file; |
| } |
| |
| private static final PatternValidator clusternamePattern |
| = new PatternValidator("[a-z][a-z0-9_-]*"); |
| |
| /** |
| * Normalize a cluster name then verify that it is valid |
| * @param name proposed cluster name |
| * @return true iff it is valid |
| */ |
| public static boolean isClusternameValid(String name) { |
| return name != null && clusternamePattern.matches(name); |
| } |
| |
| public static boolean oldIsClusternameValid(String name) { |
| if (name == null || name.isEmpty()) { |
| return false; |
| } |
| int first = name.charAt(0); |
| if (0 == (Character.getType(first) & Character.LOWERCASE_LETTER)) { |
| return false; |
| } |
| |
| for (int i = 0; i < name.length(); i++) { |
| int elt = (int) name.charAt(i); |
| int t = Character.getType(elt); |
| if (0 == (t & Character.LOWERCASE_LETTER) |
| && 0 == (t & Character.DECIMAL_DIGIT_NUMBER) |
| && elt != '-' |
| && elt != '_') { |
| return false; |
| } |
| if (!Character.isLetterOrDigit(elt) && elt != '-' && elt != '_') { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Copy a directory to a new FS -both paths must be qualified. If |
| * a directory needs to be created, supplied permissions can override |
| * the default values. Existing directories are not touched |
| * @param conf conf file |
| * @param srcDirPath src dir |
| * @param destDirPath dest dir |
| * @param permission permission for the dest directory; null means "default" |
| * @return # of files copies |
| */ |
| public static int copyDirectory(Configuration conf, |
| Path srcDirPath, |
| Path destDirPath, |
| FsPermission permission) throws |
| IOException, |
| BadClusterStateException { |
| FileSystem srcFS = FileSystem.get(srcDirPath.toUri(), conf); |
| FileSystem destFS = FileSystem.get(destDirPath.toUri(), conf); |
| //list all paths in the src. |
| if (!srcFS.exists(srcDirPath)) { |
| throw new FileNotFoundException("Source dir not found " + srcDirPath); |
| } |
| if (!srcFS.isDirectory(srcDirPath)) { |
| throw new FileNotFoundException( |
| "Source dir not a directory " + srcDirPath); |
| } |
| GlobFilter dotFilter = new GlobFilter("[!.]*"); |
| FileStatus[] entries = srcFS.listStatus(srcDirPath, dotFilter); |
| int srcFileCount = entries.length; |
| if (srcFileCount == 0) { |
| return 0; |
| } |
| if (permission == null) { |
| permission = FsPermission.getDirDefault(); |
| } |
| if (!destFS.exists(destDirPath)) { |
| new SliderFileSystem(destFS, conf).createWithPermissions(destDirPath, |
| permission); |
| } |
| Path[] sourcePaths = new Path[srcFileCount]; |
| for (int i = 0; i < srcFileCount; i++) { |
| FileStatus e = entries[i]; |
| Path srcFile = e.getPath(); |
| if (srcFS.isDirectory(srcFile)) { |
| String msg = "Configuration dir " + srcDirPath |
| + " contains a directory " + srcFile; |
| log.warn(msg); |
| throw new IOException(msg); |
| } |
| log.debug("copying src conf file {}", srcFile); |
| sourcePaths[i] = srcFile; |
| } |
| log.debug("Copying {} files from {} to dest {}", srcFileCount, |
| srcDirPath, |
| destDirPath); |
| FileUtil.copy(srcFS, sourcePaths, destFS, destDirPath, false, true, conf); |
| return srcFileCount; |
| } |
| |
| |
| public static String stringify(Throwable t) { |
| StringWriter sw = new StringWriter(); |
| sw.append(t.toString()).append('\n'); |
| t.printStackTrace(new PrintWriter(sw)); |
| return sw.toString(); |
| } |
| |
| /** |
| * Create a configuration with Slider-specific tuning. |
| * This is done rather than doing custom configs. |
| * @return the config |
| */ |
| public static YarnConfiguration createConfiguration() { |
| YarnConfiguration conf = new YarnConfiguration(); |
| patchConfiguration(conf); |
| return conf; |
| } |
| |
| /** |
| * Take an existing conf and patch it for Slider's needs. Useful |
| * in Service.init & RunService methods where a shared config is being |
| * passed in |
| * @param conf configuration |
| * @return the patched configuration |
| */ |
| public static Configuration patchConfiguration(Configuration conf) { |
| |
| //if the fallback option is NOT set, enable it. |
| //if it is explicitly set to anything -leave alone |
| if (conf.get(SliderXmlConfKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH) == |
| null) { |
| conf.set(SliderXmlConfKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH, "true"); |
| } |
| return conf; |
| } |
| |
| /** |
| * Take a collection, return a list containing the string value of every |
| * element in the collection. |
| * @param c collection |
| * @return a stringified list |
| */ |
| public static List<String> collectionToStringList(Collection c) { |
| List<String> l = new ArrayList<String>(c.size()); |
| for (Object o : c) { |
| l.add(o.toString()); |
| } |
| return l; |
| } |
| |
| /** |
| * Join an collection of objects with a separator that appears after every |
| * instance in the list -including at the end |
| * @param collection collection to call toString() on each element |
| * @param separator separator string |
| * @return the joined entries |
| */ |
| public static String join(Collection collection, String separator) { |
| return join(collection, separator, true); |
| } |
| |
| /** |
| * Join an collection of objects with a separator that appears after every |
| * instance in the list -optionally at the end |
| * @param collection collection to call toString() on each element |
| * @param separator separator string |
| * @param trailing add a trailing entry or not |
| * @return the joined entries |
| */ |
| public static String join(Collection collection, |
| String separator, |
| boolean trailing) { |
| StringBuilder b = new StringBuilder(); |
| // fast return on empty collection |
| if (collection.isEmpty()) { |
| return trailing ? separator : ""; |
| } |
| for (Object o : collection) { |
| b.append(o); |
| b.append(separator); |
| } |
| int length = separator.length(); |
| String s = b.toString(); |
| return (trailing || s.isEmpty()) ? |
| s : (b.substring(0, b.length() - length)); |
| } |
| |
| /** |
| * Join an array of strings with a separator that appears after every |
| * instance in the list -including at the end |
| * @param collection strings |
| * @param separator separator string |
| * @return the joined entries |
| */ |
| public static String join(String[] collection, String separator) { |
| return join(collection, separator, true); |
| |
| |
| } |
| |
| /** |
| * Join an array of strings with a separator that appears after every |
| * instance in the list -optionally at the end |
| * @param collection strings |
| * @param separator separator string |
| * @param trailing add a trailing entry or not |
| * @return the joined entries |
| */ |
| public static String join(String[] collection, String separator, |
| boolean trailing) { |
| return join(Arrays.asList(collection), separator, trailing); |
| } |
| |
| /** |
| * Join an array of strings with a separator that appears after every |
| * instance in the list -except at the end |
| * @param collection strings |
| * @param separator separator string |
| * @return the list |
| */ |
| public static String joinWithInnerSeparator(String separator, |
| Object... collection) { |
| StringBuilder b = new StringBuilder(); |
| boolean first = true; |
| |
| for (Object o : collection) { |
| if (first) { |
| first = false; |
| } else { |
| b.append(separator); |
| } |
| b.append(o.toString()); |
| b.append(separator); |
| } |
| return b.toString(); |
| } |
| |
| /** |
| * Resolve a mandatory environment variable |
| * @param key env var |
| * @return the resolved value |
| * @throws BadClusterStateException |
| */ |
| public static String mandatoryEnvVariable(String key) throws |
| BadClusterStateException { |
| String v = System.getenv(key); |
| if (v == null) { |
| throw new BadClusterStateException("Missing Environment variable " + key); |
| } |
| return v; |
| } |
| |
| public static String appReportToString(ApplicationReport r, |
| String separator) { |
| StringBuilder builder = new StringBuilder(512); |
| builder.append("application ") |
| .append( |
| r.getName()) |
| .append("/") |
| .append(r.getApplicationType()) |
| .append(separator); |
| Set<String> tags = r.getApplicationTags(); |
| if (!tags.isEmpty()) { |
| for (String tag : tags) { |
| builder.append(tag).append(separator); |
| } |
| } |
| builder.append("state: ").append(r.getYarnApplicationState()); |
| builder.append(separator).append("URL: ").append(r.getTrackingUrl()); |
| builder.append(separator) |
| .append("Started ") |
| .append(new Date(r.getStartTime()).toGMTString()); |
| long finishTime = r.getFinishTime(); |
| if (finishTime > 0) { |
| builder.append(separator) |
| .append("Finished ") |
| .append(new Date(finishTime).toGMTString()); |
| } |
| builder.append(separator) |
| .append("RPC :") |
| .append(r.getHost()) |
| .append(':') |
| .append(r.getRpcPort()); |
| String diagnostics = r.getDiagnostics(); |
| if (!diagnostics.isEmpty()) { |
| builder.append(separator).append("Diagnostics :").append(diagnostics); |
| } |
| return builder.toString(); |
| } |
| |
| /** |
| * Sorts the given list of application reports |
| * Finished instances are ordered by finished time and running/accepted instances are |
| * ordered by start time |
| * Finally Instance are order by finished instances coming after running instances |
| * |
| * @param instances list of intances |
| */ |
| public static void sortApplicationReport(List<ApplicationReport> instances) { |
| if (instances.size() <= 1) { |
| return; |
| } |
| List<ApplicationReport> nonLiveInstance = |
| new ArrayList<ApplicationReport>(instances.size()); |
| List<ApplicationReport> liveInstance = |
| new ArrayList<ApplicationReport>(instances.size()); |
| |
| for (ApplicationReport report : instances) { |
| if (report.getYarnApplicationState() == YarnApplicationState.RUNNING |
| || |
| report.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { |
| liveInstance.add(report); |
| } else { |
| nonLiveInstance.add(report); |
| } |
| } |
| |
| if (liveInstance.size() > 1) { |
| Comparator<ApplicationReport> liveInstanceComparator = |
| new Comparator<ApplicationReport>() { |
| @Override |
| public int compare(ApplicationReport r1, ApplicationReport r2) { |
| long x = r1.getStartTime(); |
| long y = r2.getStartTime(); |
| return (x < y) ? -1 : ((x == y) ? 0 : 1); |
| } |
| }; |
| Collections.sort(liveInstance, liveInstanceComparator); |
| } |
| if (nonLiveInstance.size() > 1) { |
| Comparator<ApplicationReport> nonLiveInstanceComparator = |
| new Comparator<ApplicationReport>() { |
| @Override |
| public int compare(ApplicationReport r1, ApplicationReport r2) { |
| long x = r1.getFinishTime(); |
| long y = r2.getFinishTime(); |
| return (x < y) ? -1 : ((x == y) ? 0 : 1); |
| } |
| }; |
| Collections.sort(nonLiveInstance, nonLiveInstanceComparator); |
| } |
| instances.clear(); |
| instances.addAll(liveInstance); |
| instances.addAll(nonLiveInstance); |
| } |
| |
| /** |
| * Built a (sorted) map of application reports, mapped to the instance name |
| * The list is sorted, and the addition process does not add a report |
| * if there is already one that exists. If the list handed in is sorted, |
| * those that are listed first form the entries returned |
| * @param instances list of intances |
| * @param minState minimum YARN state to be included |
| * @param maxState maximum YARN state to be included |
| * @return all reports in the list whose state >= minimum and <= maximum |
| */ |
| public static Map<String, ApplicationReport> buildApplicationReportMap( |
| List<ApplicationReport> instances, |
| YarnApplicationState minState, YarnApplicationState maxState) { |
| TreeMap<String, ApplicationReport> map = new TreeMap<String, ApplicationReport>(); |
| for (ApplicationReport report : instances) { |
| YarnApplicationState state = report.getYarnApplicationState(); |
| if (state.ordinal() >= minState.ordinal() && |
| state.ordinal() <= maxState.ordinal() && |
| map.get(report.getName()) == null) { |
| map.put(report.getName(), report); |
| } |
| } |
| return map; |
| } |
| |
| /** |
| * Take a map and produce a sorted equivalent |
| * @param source source map |
| * @return a map whose iterator returns the string-sorted ordering of entries |
| */ |
| public static Map<String, String> sortedMap(Map<String, String> source) { |
| Map<String, String> out = new TreeMap<String, String>(source); |
| return out; |
| } |
| |
| /** |
| * Convert a properties instance to a string map. |
| * @param properties source property object |
| * @return a string map |
| */ |
| public static Map<String, String> toMap(Properties properties) { |
| Map<String, String> out = new HashMap<String, String>(properties.size()); |
| for (Map.Entry<Object, Object> entry : properties.entrySet()) { |
| out.put(entry.getKey().toString(), entry.getValue().toString()); |
| } |
| return out; |
| } |
| |
| /** |
| * Merge in one map to another -all entries in the second map are |
| * merged into the first -overwriting any duplicate keys. |
| * @param first first map -the updated one. |
| * @param second the map that is merged in |
| * @return the first map |
| */ |
| public static Map<String, String> mergeMap(Map<String, String> first, |
| Map<String, String> second) { |
| first.putAll(second); |
| return first; |
| } |
| |
| /** |
| * Merge a set of entries into a map. This will take the entryset of |
| * a map, or a Hadoop collection itself |
| * @param dest destination |
| * @param entries entries |
| * @return dest -with the entries merged in |
| */ |
| public static Map<String, String> mergeEntries(Map<String, String> dest, |
| Iterable<Map.Entry<String, String>> entries) { |
| for (Map.Entry<String, String> entry : entries) { |
| dest.put(entry.getKey(), entry.getValue()); |
| } |
| return dest; |
| } |
| |
| /** |
| * Generic map merge logic |
| * @param first first map |
| * @param second second map |
| * @param <T1> key type |
| * @param <T2> value type |
| * @return 'first' merged with the second |
| */ |
| public static <T1, T2> Map<T1, T2> mergeMaps(Map<T1, T2> first, |
| Map<T1, T2> second) { |
| first.putAll(second); |
| return first; |
| } |
| |
| /** |
| * Generic map merge logic |
| * @param first first map |
| * @param second second map |
| * @param <T1> key type |
| * @param <T2> value type |
| * @return 'first' merged with the second |
| */ |
| public static <T1, T2> Map<T1, T2> mergeMapsIgnoreDuplicateKeys(Map<T1, T2> first, |
| Map<T1, T2> second) { |
| Preconditions.checkArgument(first != null, "Null 'first' value"); |
| Preconditions.checkArgument(second != null, "Null 'second' value"); |
| for (Map.Entry<T1, T2> entry : second.entrySet()) { |
| T1 key = entry.getKey(); |
| if (!first.containsKey(key)) { |
| first.put(key, entry.getValue()); |
| } |
| } |
| return first; |
| } |
| |
| /** |
| * Convert a map to a multi-line string for printing |
| * @param map map to stringify |
| * @return a string representation of the map |
| */ |
| public static String stringifyMap(Map<String, String> map) { |
| StringBuilder builder = new StringBuilder(); |
| for (Map.Entry<String, String> entry : map.entrySet()) { |
| builder.append(entry.getKey()) |
| .append("=\"") |
| .append(entry.getValue()) |
| .append("\"\n"); |
| |
| } |
| return builder.toString(); |
| } |
| |
| /** |
| * Get the int value of a role |
| * @param roleMap map of role key->val entries |
| * @param key key the key to look for |
| * @param defVal default value to use if the key is not in the map |
| * @param min min value or -1 for do not check |
| * @param max max value or -1 for do not check |
| * @return the int value the integer value |
| * @throws BadConfigException if the value could not be parsed |
| */ |
| public static int getIntValue(Map<String, String> roleMap, |
| String key, |
| int defVal, |
| int min, |
| int max |
| ) throws BadConfigException { |
| String valS = roleMap.get(key); |
| return parseAndValidate(key, valS, defVal, min, max); |
| |
| } |
| |
| /** |
| * Parse an int value, replacing it with defval if undefined; |
| * @param errorKey key to use in exceptions |
| * @param defVal default value to use if the key is not in the map |
| * @param min min value or -1 for do not check |
| * @param max max value or -1 for do not check |
| * @return the int value the integer value |
| * @throws BadConfigException if the value could not be parsed |
| */ |
| public static int parseAndValidate(String errorKey, |
| String valS, |
| int defVal, |
| int min, int max) throws |
| BadConfigException { |
| if (valS == null) { |
| valS = Integer.toString(defVal); |
| } |
| String trim = valS.trim(); |
| int val; |
| try { |
| val = Integer.decode(trim); |
| } catch (NumberFormatException e) { |
| throw new BadConfigException("Failed to parse value of " |
| + errorKey + ": \"" + trim + "\""); |
| } |
| if (min >= 0 && val < min) { |
| throw new BadConfigException("Value of " |
| + errorKey + ": " + val + "" |
| + "is less than the minimum of " + min); |
| } |
| if (max >= 0 && val > max) { |
| throw new BadConfigException("Value of " |
| + errorKey + ": " + val + "" |
| + "is more than the maximum of " + max); |
| } |
| return val; |
| } |
| |
| public static InetSocketAddress getRmAddress(Configuration conf) { |
| return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, |
| YarnConfiguration.DEFAULT_RM_ADDRESS, |
| YarnConfiguration.DEFAULT_RM_PORT); |
| } |
| |
| public static InetSocketAddress getRmSchedulerAddress(Configuration conf) { |
| return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); |
| } |
| |
| /** |
| * probe to see if the RM scheduler is defined |
| * @param conf config |
| * @return true if the RM scheduler address is set to |
| * something other than 0.0.0.0 |
| */ |
| public static boolean isRmSchedulerAddressDefined(Configuration conf) { |
| InetSocketAddress address = getRmSchedulerAddress(conf); |
| return isAddressDefined(address); |
| } |
| |
| /** |
| * probe to see if the address |
| * @param address network address |
| * @return true if the scheduler address is set to |
| * something other than 0.0.0.0 |
| */ |
| public static boolean isAddressDefined(InetSocketAddress address) { |
| return !(address.getHostName().equals("0.0.0.0")); |
| } |
| |
| public static void setRmAddress(Configuration conf, String rmAddr) { |
| conf.set(YarnConfiguration.RM_ADDRESS, rmAddr); |
| } |
| |
| public static void setRmSchedulerAddress(Configuration conf, String rmAddr) { |
| conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, rmAddr); |
| } |
| |
| public static boolean hasAppFinished(ApplicationReport report) { |
| return report == null || |
| report.getYarnApplicationState().ordinal() >= |
| YarnApplicationState.FINISHED.ordinal(); |
| } |
| |
| public static String containerToString(Container container) { |
| if (container == null) { |
| return "null container"; |
| } |
| return String.format(Locale.ENGLISH, |
| "ContainerID=%s nodeID=%s http=%s priority=%s", |
| container.getId(), |
| container.getNodeId(), |
| container.getNodeHttpAddress(), |
| container.getPriority()); |
| } |
| |
| /** |
| * convert an AM report to a string for diagnostics |
| * @param report the report |
| * @return the string value |
| */ |
| public static String reportToString(ApplicationReport report) { |
| if (report == null) { |
| return "Null application report"; |
| } |
| |
| return "App " + report.getName() + "/" + report.getApplicationType() + |
| "# " + |
| report.getApplicationId() + " user " + report.getUser() + |
| " is in state " + report.getYarnApplicationState() + |
| " RPC: " + report.getHost() + ":" + report.getRpcPort() + |
| " URL" + report.getOriginalTrackingUrl(); |
| } |
| |
| /** |
| * Convert a YARN URL into a string value of a normal URL |
| * @param url URL |
| * @return string representatin |
| */ |
| public static String stringify(org.apache.hadoop.yarn.api.records.URL url) { |
| StringBuilder builder = new StringBuilder(); |
| builder.append(url.getScheme()).append("://"); |
| if (url.getHost() != null) { |
| builder.append(url.getHost()).append(":").append(url.getPort()); |
| } |
| builder.append(url.getFile()); |
| return builder.toString(); |
| } |
| |
| public static int findFreePort(int start, int limit) { |
| if (start == 0) { |
| //bail out if the default is "dont care" |
| return 0; |
| } |
| int found = 0; |
| int port = start; |
| int finish = start + limit; |
| while (found == 0 && port < finish) { |
| if (isPortAvailable(port)) { |
| found = port; |
| } else { |
| port++; |
| } |
| } |
| return found; |
| } |
| |
| /** |
| * See if a port is available for listening on by trying to listen |
| * on it and seeing if that works or fails. |
| * @param port port to listen to |
| * @return true if the port was available for listening on |
| */ |
| public static boolean isPortAvailable(int port) { |
| try { |
| ServerSocket socket = new ServerSocket(port); |
| socket.close(); |
| return true; |
| } catch (IOException e) { |
| return false; |
| } |
| } |
| |
| /** |
| * Build the environment map from a role option map, finding all entries |
| * beginning with "env.", adding them to a map of (prefix-removed) |
| * env vars |
| * @param roleOpts role options. This can be null, meaning the |
| * role is undefined |
| * @return a possibly empty map of environment variables. |
| */ |
| public static Map<String, String> buildEnvMap(Map<String, String> roleOpts) { |
| Map<String, String> env = new HashMap<String, String>(); |
| if (roleOpts != null) { |
| for (Map.Entry<String, String> entry : roleOpts.entrySet()) { |
| String key = entry.getKey(); |
| if (key.startsWith(RoleKeys.ENV_PREFIX)) { |
| String envName = key.substring(RoleKeys.ENV_PREFIX.length()); |
| if (!envName.isEmpty()) { |
| env.put(envName, entry.getValue()); |
| } |
| } |
| } |
| } |
| return env; |
| } |
| |
| /** |
| * Apply a set of command line options to a cluster role map |
| * @param clusterRoleMap cluster role map to merge onto |
| * @param commandOptions command opts |
| */ |
| public static void applyCommandLineRoleOptsToRoleMap(Map<String, Map<String, String>> clusterRoleMap, |
| Map<String, Map<String, String>> commandOptions) { |
| for (Map.Entry<String, Map<String, String>> entry : commandOptions.entrySet()) { |
| String key = entry.getKey(); |
| Map<String, String> optionMap = entry.getValue(); |
| Map<String, String> existingMap = clusterRoleMap.get(key); |
| if (existingMap == null) { |
| existingMap = new HashMap<String, String>(); |
| } |
| log.debug("Overwriting role options with command line values {}", |
| stringifyMap(optionMap)); |
| mergeMap(existingMap, optionMap); |
| //set or overwrite the role |
| clusterRoleMap.put(key, existingMap); |
| } |
| } |
| |
| /** |
| * verify that the supplied cluster name is valid |
| * @param clustername cluster name |
| * @throws BadCommandArgumentsException if it is invalid |
| */ |
| public static void validateClusterName(String clustername) throws |
| BadCommandArgumentsException { |
| if (!isClusternameValid(clustername)) { |
| throw new BadCommandArgumentsException( |
| "Illegal cluster name: " + clustername); |
| } |
| } |
| |
| /** |
| * Verify that a Kerberos principal has been set -if not fail |
| * with an error message that actually tells you what is missing |
| * @param conf configuration to look at |
| * @param principal key of principal |
| * @throws BadConfigException if the key is not set |
| */ |
| public static void verifyPrincipalSet(Configuration conf, |
| String principal) throws |
| BadConfigException { |
| String principalName = conf.get(principal); |
| if (principalName == null) { |
| throw new BadConfigException("Unset Kerberos principal : %s", |
| principal); |
| } |
| log.debug("Kerberos princial {}={}", principal, principalName); |
| } |
| |
| /** |
| * Flag to indicate whether the cluster is in secure mode |
| * @param conf configuration to look at |
| * @return true if the slider client/service should be in secure mode |
| */ |
| public static boolean isHadoopClusterSecure(Configuration conf) { |
| return SecurityUtil.getAuthenticationMethod(conf) != |
| UserGroupInformation.AuthenticationMethod.SIMPLE; |
| } |
| |
| /** |
| * Init security if the cluster configuration declares the cluster is secure |
| * @param conf configuration to look at |
| * @return true if the cluster is secure |
| * @throws IOException cluster is secure |
| * @throws BadConfigException the configuration/process is invalid |
| */ |
| public static boolean maybeInitSecurity(Configuration conf) throws |
| IOException, |
| BadConfigException { |
| boolean clusterSecure = isHadoopClusterSecure(conf); |
| if (clusterSecure) { |
| log.debug("Enabling security"); |
| initProcessSecurity(conf); |
| } |
| return clusterSecure; |
| } |
| |
| /** |
| * Turn on security. This is setup to only run once. |
| * @param conf configuration to build up security |
| * @return true if security was initialized in this call |
| * @throws IOException IO/Net problems |
| * @throws BadConfigException the configuration and system state are inconsistent |
| */ |
| public static boolean initProcessSecurity(Configuration conf) throws |
| IOException, |
| BadConfigException { |
| |
| if (processSecurityAlreadyInitialized.compareAndSet(true, true)) { |
| //security is already inited |
| return false; |
| } |
| |
| log.info("JVM initialized into secure mode with kerberos realm {}", |
| SliderUtils.getKerberosRealm()); |
| //this gets UGI to reset its previous world view (i.e simple auth) |
| //security |
| log.debug("java.security.krb5.realm={}", |
| System.getProperty(JAVA_SECURITY_KRB5_REALM, "")); |
| log.debug("java.security.krb5.kdc={}", |
| System.getProperty(JAVA_SECURITY_KRB5_KDC, "")); |
| log.debug("hadoop.security.authentication={}", |
| conf.get(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION)); |
| log.debug("hadoop.security.authorization={}", |
| conf.get(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION)); |
| /* SecurityUtil.setAuthenticationMethod( |
| UserGroupInformation.AuthenticationMethod.KERBEROS, conf);*/ |
| UserGroupInformation.setConfiguration(conf); |
| UserGroupInformation authUser = UserGroupInformation.getCurrentUser(); |
| log.debug("Authenticating as " + authUser.toString()); |
| log.debug("Login user is {}", UserGroupInformation.getLoginUser()); |
| if (!UserGroupInformation.isSecurityEnabled()) { |
| throw new BadConfigException("Although secure mode is enabled," + |
| "the application has already set up its user as an insecure entity %s", |
| authUser); |
| } |
| if (authUser.getAuthenticationMethod() == |
| UserGroupInformation.AuthenticationMethod.SIMPLE) { |
| throw new BadConfigException("Auth User is not Kerberized %s" + |
| " -security has already been set up with the wrong authentication method. " |
| + |
| "This can occur if a file system has already been created prior to the loading of " |
| + "the security configuration.", |
| authUser); |
| |
| } |
| |
| SliderUtils.verifyPrincipalSet(conf, YarnConfiguration.RM_PRINCIPAL); |
| SliderUtils.verifyPrincipalSet(conf, |
| DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY); |
| return true; |
| } |
| |
| /** |
| * Force an early login: This catches any auth problems early rather than |
| * in RPC operations |
| * @throws IOException if the login fails |
| */ |
| public static void forceLogin() throws IOException { |
| if (UserGroupInformation.isSecurityEnabled()) { |
| if (UserGroupInformation.isLoginKeytabBased()) { |
| UserGroupInformation.getLoginUser().reloginFromKeytab(); |
| } else { |
| UserGroupInformation.getLoginUser().reloginFromTicketCache(); |
| } |
| } |
| } |
| |
| /** |
| * Submit a JAR containing a specific class and map it |
| * @param providerResources provider map to build up |
| * @param sliderFileSystem remote fs |
| * @param clazz class to look for |
| * @param libdir lib directory |
| * @param jarName <i>At the destination</i> |
| * @return the local resource ref |
| * @throws IOException trouble copying to HDFS |
| */ |
| public static LocalResource putJar(Map<String, LocalResource> providerResources, |
| SliderFileSystem sliderFileSystem, |
| Class clazz, |
| Path tempPath, |
| String libdir, |
| String jarName |
| ) |
| throws IOException, SliderException { |
| LocalResource res = sliderFileSystem.submitJarWithClass( |
| clazz, |
| tempPath, |
| libdir, |
| jarName); |
| providerResources.put(libdir + "/" + jarName, res); |
| return res; |
| } |
| |
| /** |
| * Submit a JAR containing and map it |
| * @param providerResources provider map to build up |
| * @param sliderFileSystem remote fs |
| * @param libDir lib directory |
| * @param srcPath copy jars from |
| * @throws IOException, SliderException trouble copying to HDFS |
| */ |
| public static void putAllJars(Map<String, LocalResource> providerResources, |
| SliderFileSystem sliderFileSystem, |
| Path tempPath, |
| String libDir, |
| String srcPath |
| ) |
| throws IOException, SliderException { |
| log.info("Loading all dependencies from {}", srcPath); |
| if (SliderUtils.isSet(srcPath)) { |
| File srcFolder = new File(srcPath); |
| FilenameFilter jarFilter = new FilenameFilter() { |
| public boolean accept(File dir, String name) { |
| String lowercaseName = name.toLowerCase(); |
| if (lowercaseName.endsWith(".jar")) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| }; |
| File[] listOfJars = srcFolder.listFiles(jarFilter); |
| for (File jarFile : listOfJars) { |
| LocalResource res = sliderFileSystem.submitFile(jarFile, tempPath, libDir, jarFile.getName()); |
| providerResources.put(libDir + "/" + jarFile.getName(), res); |
| } |
| } |
| } |
| |
| public static Map<String, Map<String, String>> deepClone(Map<String, Map<String, String>> src) { |
| Map<String, Map<String, String>> dest = |
| new HashMap<String, Map<String, String>>(); |
| for (Map.Entry<String, Map<String, String>> entry : src.entrySet()) { |
| dest.put(entry.getKey(), stringMapClone(entry.getValue())); |
| } |
| return dest; |
| } |
| |
| public static Map<String, String> stringMapClone(Map<String, String> src) { |
| Map<String, String> dest = new HashMap<String, String>(); |
| return mergeEntries(dest, src.entrySet()); |
| } |
| |
| /** |
| * List a directory in the local filesystem |
| * @param dir directory |
| * @return a listing, one to a line |
| */ |
| public static String listDir(File dir) { |
| if (dir == null) { |
| return ""; |
| } |
| StringBuilder builder = new StringBuilder(); |
| String[] confDirEntries = dir.list(); |
| for (String entry : confDirEntries) { |
| builder.append(entry).append("\n"); |
| } |
| return builder.toString(); |
| } |
| |
| /** |
| * Create a file:// path from a local file |
| * @param file file to point the path |
| * @return a new Path |
| */ |
| public static Path createLocalPath(File file) { |
| return new Path(file.toURI()); |
| } |
| |
| /** |
| * Get the current user -relays to |
| * {@link UserGroupInformation#getCurrentUser()} |
| * with any Slider-specific post processing and exception handling |
| * @return user info |
| * @throws IOException on a failure to get the credentials |
| */ |
| public static UserGroupInformation getCurrentUser() throws IOException { |
| |
| try { |
| UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); |
| return currentUser; |
| } catch (IOException e) { |
| log.info("Failed to get user info", e); |
| throw e; |
| } |
| } |
| |
| public static String getKerberosRealm() { |
| try { |
| return KerberosUtil.getDefaultRealm(); |
| } catch (Exception e) { |
| log.debug("introspection into JVM internals failed", e); |
| return "(unknown)"; |
| |
| } |
| } |
| |
| /** |
| * Register the client resource in |
| * {@link SliderKeys#CLIENT_RESOURCE} |
| * for Configuration instances. |
| * |
| * @return true if the resource could be loaded |
| */ |
| public static URL registerClientResource() { |
| return ConfigHelper.registerDefaultResource(SliderKeys.CLIENT_RESOURCE); |
| } |
| |
| |
| /** |
| * Attempt to load the slider client resource. If the |
| * resource is not on the CP an empty config is returned. |
| * @return a config |
| */ |
| public static Configuration loadClientConfigurationResource() { |
| return ConfigHelper.loadFromResource(SliderKeys.CLIENT_RESOURCE); |
| } |
| |
| /** |
| * Convert a char sequence to a string. |
| * This ensures that comparisions work |
| * @param charSequence source |
| * @return the string equivalent |
| */ |
| public static String sequenceToString(CharSequence charSequence) { |
| StringBuilder stringBuilder = new StringBuilder(charSequence); |
| return stringBuilder.toString(); |
| } |
| |
| /** |
| * Build up the classpath for execution |
| * -behaves very differently on a mini test cluster vs a production |
| * production one. |
| * |
| * @param sliderConfDir relative path to the dir containing slider config |
| * options to put on the classpath -or null |
| * @param libdir directory containing the JAR files |
| * @param config the configuration |
| * @param usingMiniMRCluster flag to indicate the MiniMR cluster is in use |
| * (and hence the current classpath should be used, not anything built up) |
| * @return a classpath |
| */ |
| public static ClasspathConstructor buildClasspath(String sliderConfDir, |
| String libdir, |
| Configuration config, |
| boolean usingMiniMRCluster) { |
| |
| ClasspathConstructor classpath = new ClasspathConstructor(); |
| |
| // add the runtime classpath needed for tests to work |
| if (usingMiniMRCluster) { |
| // for mini cluster we pass down the java CP properties |
| // and nothing else |
| classpath.appendAll(classpath.localJVMClasspath()); |
| } else { |
| if (sliderConfDir != null) { |
| classpath.addClassDirectory(sliderConfDir); |
| } |
| classpath.addLibDir(libdir); |
| classpath.addRemoteClasspathEnvVar(); |
| classpath.append(ApplicationConstants.Environment.HADOOP_CONF_DIR.$()); |
| } |
| return classpath; |
| } |
| |
| /** |
| * Verify that a path refers to a directory. If not |
| * logs the parent dir then throws an exception |
| * @param dir the directory |
| * @param errorlog log for output on an error |
| * @throws FileNotFoundException if it is not a directory |
| */ |
| public static void verifyIsDir(File dir, Logger errorlog) throws |
| FileNotFoundException { |
| if (!dir.exists()) { |
| errorlog.warn("contents of {}: {}", dir, |
| listDir(dir.getParentFile())); |
| throw new FileNotFoundException(dir.toString()); |
| } |
| if (!dir.isDirectory()) { |
| errorlog.info("contents of {}: {}", dir, |
| listDir(dir.getParentFile())); |
| throw new FileNotFoundException( |
| "Not a directory: " + dir); |
| } |
| } |
| |
| /** |
| * Verify that a file exists |
| * @param file file |
| * @param errorlog log for output on an error |
| * @throws FileNotFoundException |
| */ |
| public static void verifyFileExists(File file, Logger errorlog) throws |
| FileNotFoundException { |
| if (!file.exists()) { |
| errorlog.warn("contents of {}: {}", file, |
| listDir(file.getParentFile())); |
| throw new FileNotFoundException(file.toString()); |
| } |
| if (!file.isFile()) { |
| throw new FileNotFoundException("Not a file: " + file.toString()); |
| } |
| } |
| |
| /** |
| * verify that a config option is set |
| * @param configuration config |
| * @param key key |
| * @return the value, in case it needs to be verified too |
| * @throws BadConfigException if the key is missing |
| */ |
| public static String verifyOptionSet(Configuration configuration, String key, |
| boolean allowEmpty) throws BadConfigException { |
| String val = configuration.get(key); |
| if (val == null) { |
| throw new BadConfigException( |
| "Required configuration option \"%s\" not defined ", key); |
| } |
| if (!allowEmpty && val.isEmpty()) { |
| throw new BadConfigException( |
| "Configuration option \"%s\" must not be empty", key); |
| } |
| return val; |
| } |
| |
| /** |
| * Verify that a keytab property is defined and refers to a non-empty file |
| * |
| * @param siteConf configuration |
| * @param prop property to look for |
| * @return the file referenced |
| * @throws BadConfigException on a failure |
| */ |
| public static File verifyKeytabExists(Configuration siteConf, |
| String prop) throws |
| BadConfigException { |
| String keytab = siteConf.get(prop); |
| if (keytab == null) { |
| throw new BadConfigException("Missing keytab property %s", |
| prop); |
| |
| } |
| File keytabFile = new File(keytab); |
| if (!keytabFile.exists()) { |
| throw new BadConfigException("Missing keytab file %s defined in %s", |
| keytabFile, |
| prop); |
| } |
| if (keytabFile.length() == 0 || !keytabFile.isFile()) { |
| throw new BadConfigException("Invalid keytab file %s defined in %s", |
| keytabFile, |
| prop); |
| } |
| return keytabFile; |
| } |
| |
| /** |
| * Convert an epoch time to a GMT time. This |
| * uses the deprecated Date.toString() operation, |
| * so is in one place to reduce the number of deprecation warnings. |
| * @param time timestamp |
| * @return string value as ISO-9601 |
| */ |
| @SuppressWarnings({"CallToDateToString", "deprecation"}) |
| public static String toGMTString(long time) { |
| return new Date(time).toGMTString(); |
| } |
| |
| /** |
| * Add the cluster build information; this will include Hadoop details too |
| * @param info cluster info |
| * @param prefix prefix for the build info |
| */ |
| public static void addBuildInfo(Map<String, String> info, String prefix) { |
| |
| Properties props = SliderVersionInfo.loadVersionProperties(); |
| info.put(prefix + "." + SliderVersionInfo.APP_BUILD_INFO, props.getProperty( |
| SliderVersionInfo.APP_BUILD_INFO)); |
| info.put(prefix + "." + SliderVersionInfo.HADOOP_BUILD_INFO, |
| props.getProperty(SliderVersionInfo.HADOOP_BUILD_INFO)); |
| |
| info.put(prefix + "." + SliderVersionInfo.HADOOP_DEPLOYED_INFO, |
| VersionInfo.getBranch() + " @" + VersionInfo.getSrcChecksum()); |
| } |
| |
| /** |
| * Set the time for an information (human, machine) timestamp pair of fields. |
| * The human time is the time in millis converted via the {@link Date} class. |
| * @param info info fields |
| * @param keyHumanTime name of human time key |
| * @param keyMachineTime name of machine time |
| * @param time timestamp |
| */ |
| public static void setInfoTime(Map info, |
| String keyHumanTime, |
| String keyMachineTime, |
| long time) { |
| info.put(keyHumanTime, SliderUtils.toGMTString(time)); |
| info.put(keyMachineTime, Long.toString(time)); |
| } |
| |
| public static Path extractImagePath(CoreFileSystem fs, |
| MapOperations internalOptions) throws |
| SliderException, IOException { |
| Path imagePath; |
| String imagePathOption = |
| internalOptions.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); |
| String appHomeOption = |
| internalOptions.get(InternalKeys.INTERNAL_APPLICATION_HOME); |
| if (!isUnset(imagePathOption)) { |
| if (!isUnset(appHomeOption)) { |
| throw new BadClusterStateException( |
| ErrorStrings.E_BOTH_IMAGE_AND_HOME_DIR_SPECIFIED); |
| } |
| imagePath = fs.createPathThatMustExist(imagePathOption); |
| } else { |
| imagePath = null; |
| if (isUnset(appHomeOption)) { |
| throw new BadClusterStateException( |
| ErrorStrings.E_NO_IMAGE_OR_HOME_DIR_SPECIFIED); |
| } |
| } |
| return imagePath; |
| } |
| |
| /** |
| * trigger a JVM halt with no clean shutdown at all |
| * @param status status code for exit |
| * @param text text message |
| * @param delay delay in millis |
| * @return the timer (assuming the JVM hasn't halted yet) |
| * |
| */ |
| public static Timer haltAM(int status, String text, int delay) { |
| |
| Timer timer = new Timer("halt timer", false); |
| timer.schedule(new DelayedHalt(status, text), delay); |
| return timer; |
| } |
| |
| public static String propertiesToString(Properties props) { |
| TreeSet<String> keys = new TreeSet<String>(props.stringPropertyNames()); |
| StringBuilder builder = new StringBuilder(); |
| for (String key : keys) { |
| builder.append(key) |
| .append("=") |
| .append(props.getProperty(key)) |
| .append("\n"); |
| } |
| return builder.toString(); |
| } |
| |
| /** |
| * Add a subpath to an existing URL. This extends |
| * the path, inserting a / between all entries |
| * if needed. |
| * @param base base path/URL |
| * @param path subpath |
| * @return base+"/"+subpath |
| */ |
| public static String appendToURL(String base, String path) { |
| StringBuilder fullpath = new StringBuilder(base); |
| if (!base.endsWith("/")) { |
| fullpath.append("/"); |
| } |
| if (path.startsWith("/")) { |
| fullpath.append(path.substring(1)); |
| } else { |
| fullpath.append(path); |
| } |
| return fullpath.toString(); |
| } |
| |
| /** |
| * Append a list of paths, inserting "/" signs as appropriate |
| * @param base base path/URL |
| * @param paths subpaths |
| * @return base+"/"+paths[0]+"/"+paths[1]... |
| */ |
| public static String appendToURL(String base, String... paths) { |
| String result = base; |
| for (String path : paths) { |
| result = appendToURL(result, path); |
| } |
| return result; |
| } |
| |
| |
| /** |
| * Truncate the given string to a maximum length provided |
| * with a pad (...) added to the end if expected size if more than 10. |
| * @param toTruncate |
| * @param maxSize |
| * @return |
| */ |
| public static String truncate(String toTruncate, int maxSize) { |
| if (toTruncate == null || maxSize < 1 |
| || toTruncate.length() <= maxSize) { |
| return toTruncate; |
| } |
| |
| String pad = "..."; |
| if (maxSize < 10) { |
| pad = ""; |
| } |
| return toTruncate.substring(0, maxSize - pad.length()).concat(pad); |
| } |
| |
| |
| /** |
| * Callable for async/scheduled halt |
| */ |
| public static class DelayedHalt extends TimerTask { |
| private final int status; |
| private final String text; |
| |
| public DelayedHalt(int status, String text) { |
| this.status = status; |
| this.text = text; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| ExitUtil.halt(status, text); |
| //this should never be reached |
| } catch (ExitUtil.HaltException e) { |
| log.info("Halt failed"); |
| } |
| } |
| } |
| |
| /** |
| * A compareTo function that converts the result of a long |
| * comparision into the integer that <code>Comparable</code> |
| * expects. |
| * @param left left side |
| * @param right right side |
| * @return -1, 0, 1 depending on the diff |
| */ |
| public static int compareTo(long left, long right) { |
| long diff = left - right; |
| if (diff < 0) { |
| return -1; |
| } |
| if (diff > 0) { |
| return 1; |
| } |
| return 0; |
| } |
| |
| /** |
| * This wrapps ApplicationReports and generates a string version |
| * iff the toString() operator is invoked |
| */ |
| public static class OnDemandReportStringifier { |
| private final ApplicationReport report; |
| |
| public OnDemandReportStringifier(ApplicationReport report) { |
| this.report = report; |
| } |
| |
| @Override |
| public String toString() { |
| return appReportToString(report, "\n"); |
| } |
| } |
| |
| public static InputStream getApplicationResourceInputStream(FileSystem fs, |
| Path appPath, |
| String entry) |
| throws IOException { |
| InputStream is = null; |
| FSDataInputStream appStream = null; |
| try { |
| appStream = fs.open(appPath); |
| ZipArchiveInputStream zis = new ZipArchiveInputStream(appStream); |
| ZipArchiveEntry zipEntry; |
| boolean done = false; |
| while (!done && (zipEntry = zis.getNextZipEntry()) != null) { |
| if (entry.equals(zipEntry.getName())) { |
| int size = (int) zipEntry.getSize(); |
| if (size != -1) { |
| log.info("Reading {} of size {}", zipEntry.getName(), |
| zipEntry.getSize()); |
| byte[] content = new byte[size]; |
| int offset = 0; |
| while (offset < size) { |
| offset += zis.read(content, offset, size - offset); |
| } |
| is = new ByteArrayInputStream(content); |
| } else { |
| log.debug("Size unknown. Reading {}", zipEntry.getName()); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| while (true) { |
| int byteRead = zis.read(); |
| if (byteRead == -1) { |
| break; |
| } |
| baos.write(byteRead); |
| } |
| is = new ByteArrayInputStream(baos.toByteArray()); |
| } |
| done = true; |
| } |
| } |
| } finally { |
| IOUtils.closeStream(appStream); |
| } |
| |
| return is; |
| } |
| |
| |
| /** |
| * Strictly verify that windows utils is present. |
| * Checks go as far as opening the file and looking for |
| * the headers. |
| * @throws IOException on any problem reading the file |
| * @throws FileNotFoundException if the file is not considered valid |
| * @param logger |
| */ |
| public static void maybeVerifyWinUtilsValid(Logger logger) throws |
| IOException, |
| SliderException { |
| if (!Shell.WINDOWS) { |
| return; |
| } |
| String exePath = Shell.getWinUtilsPath(); |
| String program = WINUTILS; |
| if (exePath == null) { |
| throw new FileNotFoundException(program + " not found on Path : " + |
| System.getenv("Path")); |
| } |
| File exe = new File(exePath); |
| |
| verifyWindowsExe(program, exe); |
| execCommand(WINUTILS, 0, 5000, log, null, exePath, "systeminfo"); |
| |
| } |
| |
| public static void verifyIsFile(String program, File exe) throws |
| FileNotFoundException { |
| if (!exe.isFile()) { |
| throw new FileNotFoundException(program |
| + " at " + exe |
| + " is not a file"); |
| |
| } |
| } |
| |
| public static void verifyFileSize(String program, |
| File exe, |
| int minFileSize) throws FileNotFoundException { |
| if (exe.length() < minFileSize) { |
| throw new FileNotFoundException(program |
| + " at " + exe |
| + " is too short to be an executable"); |
| } |
| } |
| |
| /** |
| * Look for the windows executable and check it has the right headers. |
| * <code>File.canRead()</code> doesn't work on windows, so the reading |
| * is mandatory. |
| * |
| * @param program program name for errors |
| * @param exe executable |
| * @throws IOException IOE |
| */ |
| public static void verifyWindowsExe(String program, File exe) |
| throws IOException { |
| verifyIsFile(program, exe); |
| |
| verifyFileSize(program, exe, 0x100); |
| |
| // now read two bytes and verify the header. |
| |
| FileReader reader = null; |
| try { |
| int[] header = new int[2]; |
| reader = new FileReader(exe); |
| header[0] = reader.read(); |
| header[1] = reader.read(); |
| if ((header[0] != 'M' || header[1] != 'Z')) { |
| throw new FileNotFoundException(program |
| + " at " + exe |
| + " is not a windows executable file"); |
| } |
| } finally { |
| IOUtils.closeStream(reader); |
| } |
| } |
| |
| /** |
| * Verify that a Unix exe works |
| * @param program program name for errors |
| * @param exe executable |
| * @throws IOException IOE |
| |
| */ |
| public static void verifyUnixExe(String program, File exe) |
| throws IOException { |
| verifyIsFile(program, exe); |
| |
| // read flag |
| if (!exe.canRead()) { |
| throw new IOException("Cannot read " + program + " at " + exe); |
| } |
| // exe flag |
| if (!exe.canExecute()) { |
| throw new IOException("Cannot execute " + program + " at " + exe); |
| } |
| } |
| |
| /** |
| * Validate an executable |
| * @param program program name for errors |
| * @param exe program to look at |
| * @throws IOException |
| */ |
| public static void validateExe(String program, File exe) throws IOException { |
| if (!Shell.WINDOWS) { |
| verifyWindowsExe(program, exe); |
| } else { |
| verifyUnixExe(program, exe); |
| } |
| } |
| |
| /** |
| * Write bytes to a file |
| * @param outfile output file |
| * @param data data to write |
| * @param createParent flag to indicate that the parent dir should |
| * be created |
| * @throws IOException on any IO problem |
| */ |
| public static void write(File outfile, byte[] data, boolean createParent) |
| throws IOException { |
| File parentDir = outfile.getParentFile(); |
| if (createParent) { |
| parentDir.mkdirs(); |
| } |
| SliderUtils.verifyIsDir(parentDir, log); |
| FileOutputStream out = new FileOutputStream(outfile); |
| try { |
| out.write(data); |
| } finally { |
| IOUtils.closeStream(out); |
| } |
| |
| } |
| |
| /** |
| * Execute a command for a test operation |
| * @param name name in error |
| * @param status status code expected |
| * @param timeoutMillis timeout in millis for process to finish |
| * @param logger |
| * @param outputString optional string to grep for (must not span a line) |
| * @param commands commands @return the process |
| * @throws IOException on any failure. |
| */ |
| public static ForkedProcessService execCommand(String name, |
| int status, |
| long timeoutMillis, |
| Logger logger, |
| String outputString, |
| String... commands) throws IOException, SliderException { |
| Preconditions.checkArgument(isSet(name), "no name"); |
| Preconditions.checkArgument(commands.length > 0, "no commands"); |
| Preconditions.checkArgument(isSet(commands[0]), "empty command"); |
| |
| ForkedProcessService process; |
| |
| |
| process = new ForkedProcessService( |
| name, |
| new HashMap<String, String>(), |
| Arrays.asList(commands)); |
| process.setProcessLog(logger); |
| process.init(new Configuration()); |
| String errorText = null; |
| EndOfServiceWaiter waiter = new EndOfServiceWaiter(process); |
| process.start(); |
| try { |
| waiter.waitForServiceToStop(timeoutMillis); |
| int exitCode = process.getExitCode(); |
| List<String> recentOutput = process.getRecentOutput(); |
| if (status != exitCode) { |
| // error condition |
| errorText = "Expected exit code={" + status + "}, " |
| + "actual exit code={" + exitCode + "}"; |
| } else { |
| if (isSet(outputString)) { |
| boolean found = false; |
| for (String line : recentOutput) { |
| if (line.contains(outputString)) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| errorText = "Did not find \"" + outputString + "\"" |
| + " in output"; |
| } |
| } |
| } |
| if (errorText == null) { |
| return process; |
| } |
| |
| } catch (InterruptedException e) { |
| throw new InterruptedIOException(e.toString()); |
| } catch (TimeoutException e) { |
| errorText = e.toString(); |
| } |
| // error text: non null ==> operation failed |
| log.warn(errorText); |
| List<String> recentOutput = process.getRecentOutput(); |
| for (String line : recentOutput) { |
| log.info(line); |
| } |
| throw new SliderException(LauncherExitCodes.EXIT_OTHER_FAILURE, |
| "Process %s failed: %s", name, errorText); |
| |
| } |
| |
| |
| /** |
| * Validate the slider client-side execution environment. |
| * This looks for everything felt to be critical for execution, including |
| * native binaries and other essential dependencies. |
| * @param logger logger to log to on normal execution |
| * @throws IOException on IO failures |
| * @throws SliderException on validation failures |
| */ |
| public static void validateSliderClientEnvironment(Logger logger) throws |
| IOException, |
| SliderException { |
| maybeVerifyWinUtilsValid(logger); |
| } |
| |
| /** |
| * Validate the slider server-side execution environment. |
| * This looks for everything felt to be critical for execution, including |
| * native binaries and other essential dependencies. |
| * @param logger logger to log to on normal execution |
| * @throws IOException on IO failures |
| * @throws SliderException on validation failures |
| */ |
| public static void validateSliderServerEnvironment(Logger logger) throws |
| IOException, |
| SliderException { |
| maybeVerifyWinUtilsValid(logger); |
| execCommand(OPENSSL, 0, 5000, logger, "OpenSSL", OPENSSL, "version"); |
| execCommand(PYTHON, 0, 5000, logger, "Python", PYTHON, "--version"); |
| } |
| |
| /** |
| * return the path to the currently running slider command |
| * |
| * @throws NullPointerException |
| * - If the pathname argument is null |
| * @throws SecurityException |
| * - if a security manager exists and its checkPermission method |
| * doesn't allow getting the ProtectionDomain |
| */ |
| public static String getCurrentCommandPath() { |
| File f = new File(Slider.class.getProtectionDomain().getCodeSource() |
| .getLocation().getPath()); |
| return f.getAbsolutePath(); |
| } |
| |
| /** |
| * return the path to the slider-client.xml used by the current running |
| * slider command |
| * |
| * @throws SecurityException |
| * - if a security manager exists and its checkPermission method |
| * denies access to the class loader for the class |
| */ |
| public static String getClientConfigPath() { |
| URL path = ConfigHelper.class.getClassLoader().getResource( |
| SliderKeys.CLIENT_RESOURCE); |
| return path.toString(); |
| } |
| |
| /** |
| * validate if slider-client.xml under the path can be opened |
| * |
| * @throws IOException |
| * : the file can't be found or open |
| */ |
| public static void validateClientConfigFile() throws IOException { |
| URL resURL = SliderVersionInfo.class.getClassLoader().getResource( |
| SliderKeys.CLIENT_RESOURCE); |
| if (resURL == null) { |
| throw new IOException( |
| "slider-client.xml doesn't exist on the path: " |
| + getClientConfigPath()); |
| } |
| |
| try { |
| InputStream inStream = resURL.openStream(); |
| if (inStream == null) { |
| throw new IOException("slider-client.xml can't be opened"); |
| } |
| } catch (IOException e) { |
| throw new IOException("slider-client.xml can't be opened: " |
| + e.toString()); |
| } |
| } |
| |
| /** |
| * validate if a file on HDFS can be open |
| * |
| * @throws IOException the file can't be found or opened |
| * @throws URISyntaxException |
| */ |
| public static void validateHDFSFile(SliderFileSystem sliderFileSystem, |
| String pathStr) |
| throws IOException, URISyntaxException { |
| URI pathURI = new URI(pathStr); |
| InputStream inputStream = |
| sliderFileSystem.getFileSystem().open(new Path(pathURI)); |
| if (inputStream == null) { |
| throw new IOException("HDFS file " + pathStr + " can't be opened"); |
| } |
| } |
| |
| /** |
| * return the version and path of the JDK invoking the current running |
| * slider command |
| * |
| * @throws SecurityException |
| * - if a security manager exists and its checkPropertyAccess |
| * method doesn't allow access to the specified system property. |
| */ |
| public static String getJDKInfo() { |
| String version = System.getProperty("java.version"); |
| String javaHome = System.getProperty("java.home"); |
| return |
| "The version of the JDK invoking the current running slider command: " |
| + version + "; The path to it is: " + javaHome; |
| } |
| |
| /** |
| * return a description of whether the current user has created credential |
| * cache files from kerberos servers |
| * |
| * @throws IOException |
| * @throws BadConfigException |
| * @throws SecurityException |
| * - if a security manager exists and its checkPropertyAccess |
| * method doesn't allow access to the specified system property. |
| */ |
| public static String checkCredentialCacheFile() throws IOException, |
| BadConfigException { |
| String result = null; |
| if (!Shell.WINDOWS) { |
| result = Shell.execCommand("klist"); |
| } |
| return result; |
| } |
| } |