blob: d8caa1edf62fb7827beead1705d73dec10ba28f3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.client;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.StramClient;
import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.util.ConfigUtils;
import com.datatorrent.stram.util.ConfigValidator;
/**
* Collection of utility classes for command line interface package<p>
* <br>
* List includes<br>
* Yarn Client Helper<br>
* Resource Mgr Client Helper<br>
* <br>
*
* @since 0.3.2
*/
public class StramClientUtils
{
public static final String DT_VERSION = StreamingApplication.DT_PREFIX + "version";
public static final String DT_DFS_ROOT_DIR = StreamingApplication.DT_PREFIX + "dfsRootDirectory";
public static final String APEX_APP_DFS_ROOT_DIR = StreamingApplication.APEX_PREFIX + "app.dfsRootDirectory";
public static final String DT_DFS_USER_NAME = "%USER_NAME%";
public static final String DT_CONFIG_STATUS = StreamingApplication.DT_PREFIX + "configStatus";
public static final String SUBDIR_APPS = "apps";
public static final String SUBDIR_PROFILES = "profiles";
public static final String SUBDIR_CONF = "conf";
public static final long RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE = 10 * 1000;
public static final String DT_HDFS_TOKEN_MAX_LIFE_TIME = StreamingApplication.DT_PREFIX + "namenode.delegation.token.max-lifetime";
public static final String HDFS_TOKEN_MAX_LIFE_TIME = "dfs.namenode.delegation.token.max-lifetime";
public static final String DT_RM_TOKEN_MAX_LIFE_TIME = StreamingApplication.DT_PREFIX + "resourcemanager.delegation.token.max-lifetime";
@Deprecated
public static final String KEY_TAB_FILE = StramUserLogin.DT_AUTH_PREFIX + "store.keytab";
public static final String TOKEN_ANTICIPATORY_REFRESH_FACTOR = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.factor";
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60 * 1000;
public static final String TOKEN_REFRESH_PRINCIPAL = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.principal";
public static final String TOKEN_REFRESH_KEYTAB = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.keytab";
/**
* TBD<p>
* <br>
*/
public static class YarnClientHelper
{
private static final Logger LOG = LoggerFactory.getLogger(YarnClientHelper.class);
// Configuration
private final Configuration conf;
// RPC to communicate to RM
private final YarnRPC rpc;
public YarnClientHelper(Configuration conf)
{
// Set up the configuration and RPC
this.conf = conf;
this.rpc = YarnRPC.create(conf);
}
public Configuration getConf()
{
return this.conf;
}
public YarnRPC getYarnRPC()
{
return rpc;
}
/**
* Connect to the Resource Manager/Applications Manager<p>
*
* @return Handle to communicate with the ASM
* @throws IOException
*/
public ApplicationClientProtocol connectToASM() throws IOException
{
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
LOG.debug("Connecting to ResourceManager at " + rmAddress);
return ((ApplicationClientProtocol)rpc.getProxy(ApplicationClientProtocol.class, rmAddress, conf));
}
/**
* Connect to the Resource Manager<p>
*
* @return Handle to communicate with the RM
*/
public ApplicationMasterProtocol connectToRM()
{
InetSocketAddress rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
LOG.debug("Connecting to ResourceManager at " + rmAddress);
return ((ApplicationMasterProtocol)rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, conf));
}
}
/**
* Bunch of utilities that ease repeating interactions with {@link ClientRMProxy}<p>
*/
public static class ClientRMHelper
{
private static final Logger LOG = LoggerFactory.getLogger(ClientRMHelper.class);
private static final String RM_HOSTNAME_PREFIX = YarnConfiguration.RM_PREFIX + "hostname.";
private final YarnClient clientRM;
private final Configuration conf;
public ClientRMHelper(YarnClient yarnClient, Configuration conf) throws IOException
{
this.clientRM = yarnClient;
this.conf = conf;
}
public interface AppStatusCallback
{
boolean exitLoop(ApplicationReport report);
}
/**
* Monitor the submitted application for completion. Kill application if time expires.
*
* @param appId Application Id of application to be monitored
* @param callback
* @param timeoutMillis
* @return true if application completed successfully
* @throws YarnException
* @throws IOException
*/
@SuppressWarnings("SleepWhileInLoop")
public boolean waitForCompletion(ApplicationId appId, AppStatusCallback callback, long timeoutMillis) throws YarnException, IOException
{
long startMillis = System.currentTimeMillis();
while (true) {
// Check app status every 1 second.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.debug("Thread sleep in monitoring loop interrupted");
}
ApplicationReport report = clientRM.getApplicationReport(appId);
if (callback.exitLoop(report) == true) {
return true;
}
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
LOG.info("Application has completed successfully. Breaking monitoring loop");
return true;
} else {
LOG.info("Application finished unsuccessfully."
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+ ". Breaking monitoring loop");
return false;
}
} else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
LOG.info("Application did not finish."
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+ ". Breaking monitoring loop");
return false;
}
if (System.currentTimeMillis() - startMillis > timeoutMillis) {
LOG.info("Reached specified timeout. Killing application");
clientRM.killApplication(appId);
return false;
}
}
}
// TODO: HADOOP UPGRADE - replace with YarnConfiguration constants
private Token<RMDelegationTokenIdentifier> getRMHAToken(org.apache.hadoop.yarn.api.records.Token rmDelegationToken)
{
// Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<>();
for (String rmId : ConfigUtils.getRMHAIds(conf)) {
LOG.info("Yarn Resource Manager id: {}", rmId);
// Set RM_ID to get the corresponding RM_ADDRESS
services.add(SecurityUtil.buildTokenService(getRMHAAddress(rmId)).toString());
}
Text rmTokenService = new Text(Joiner.on(',').join(services));
return new Token<>(
rmDelegationToken.getIdentifier().array(),
rmDelegationToken.getPassword().array(),
new Text(rmDelegationToken.getKind()),
rmTokenService);
}
public void addRMDelegationToken(final String renewer, final Credentials credentials) throws IOException, YarnException
{
// Get the ResourceManager delegation rmToken
final org.apache.hadoop.yarn.api.records.Token rmDelegationToken = clientRM.getRMDelegationToken(new Text(renewer));
Token<RMDelegationTokenIdentifier> token;
// TODO: Use the utility method getRMDelegationTokenService in ClientRMProxy to remove the separate handling of
// TODO: HA and non-HA cases when hadoop dependency is changed to hadoop 2.4 or above
if (ConfigUtils.isRMHAEnabled(conf)) {
LOG.info("Yarn Resource Manager HA is enabled");
token = getRMHAToken(rmDelegationToken);
} else {
LOG.info("Yarn Resource Manager HA is not enabled");
InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
token = ConverterUtils.convertFromYarn(rmDelegationToken, rmAddress);
}
LOG.info("RM dt {}", token);
credentials.addToken(token.getService(), token);
}
public InetSocketAddress getRMHAAddress(String rmId)
{
YarnConfiguration yarnConf = StramClientUtils.getYarnConfiguration(conf);
yarnConf.set(ConfigUtils.RM_HA_ID, rmId);
InetSocketAddress socketAddr = yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
yarnConf.unset(ConfigUtils.RM_HA_ID);
return socketAddr;
}
}
private static final Logger LOG = LoggerFactory.getLogger(StramClientUtils.class);
public static String getHostName()
{
try {
return java.net.InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException ex) {
return null;
}
}
public static File getUserDTDirectory()
{
String envHome = System.getenv("HOME");
if (StringUtils.isEmpty(envHome)) {
return new File(FileUtils.getUserDirectory(), ".dt");
} else {
return new File(envHome, ".dt");
}
}
public static File getConfigDir()
{
URL resource = StramClientUtils.class.getClassLoader().getResource(DT_ENV_SH_FILE);
try {
if (resource == null) {
return getUserDTDirectory();
}
return new File(resource.toURI()).getParentFile();
} catch (URISyntaxException ex) {
throw new RuntimeException(ex);
}
}
public static File getInstallationDir()
{
URL resource = StramClientUtils.class.getClassLoader().getResource(DT_ENV_SH_FILE);
try {
if (resource == null) {
return null;
}
return new File(resource.toURI()).getParentFile().getParentFile();
} catch (URISyntaxException ex) {
throw new RuntimeException(ex);
}
}
public static boolean isDevelopmentMode()
{
return getUserDTDirectory().equals(getConfigDir());
}
public static File getBackupsDirectory()
{
return new File(getConfigDir(), BACKUPS_DIRECTORY);
}
public static final String DT_DEFAULT_XML_FILE = "dt-default.xml";
public static final String DT_SITE_XML_FILE = "dt-site.xml";
public static final String DT_SITE_GLOBAL_XML_FILE = "dt-site-global.xml";
public static final String DT_ENV_SH_FILE = "dt-env.sh";
public static final String CUSTOM_ENV_SH_FILE = "custom-env.sh";
public static final String BACKUPS_DIRECTORY = "backups";
public static Configuration addDTDefaultResources(Configuration conf)
{
conf.addResource(DT_DEFAULT_XML_FILE);
return conf;
}
public static Configuration addDTSiteResources(Configuration conf)
{
addDTLocalResources(conf);
File targetGlobalFile;
try (FileSystem fs = newFileSystemInstance(conf)) {
// after getting the dfsRootDirectory config parameter, redo the entire process with the global config
// load global settings from DFS
targetGlobalFile = new File(String.format("%s/dt-site-global-%s.xml", System.getProperty("java.io.tmpdir"),
UserGroupInformation.getLoginUser().getShortUserName()));
org.apache.hadoop.fs.Path hdfsGlobalPath = new org.apache.hadoop.fs.Path(StramClientUtils.getDTDFSConfigDir(fs, conf), StramClientUtils.DT_SITE_GLOBAL_XML_FILE);
LOG.debug("Copying global dt-site.xml from {} to {}", hdfsGlobalPath, targetGlobalFile.getAbsolutePath());
fs.copyToLocalFile(hdfsGlobalPath, new org.apache.hadoop.fs.Path(targetGlobalFile.toURI()));
addDTSiteResources(conf, targetGlobalFile);
if (!isDevelopmentMode()) {
// load node local config file
addDTSiteResources(conf, new File(StramClientUtils.getConfigDir(), StramClientUtils.DT_SITE_XML_FILE));
}
// load user config file
addDTSiteResources(conf, new File(StramClientUtils.getUserDTDirectory(), StramClientUtils.DT_SITE_XML_FILE));
} catch (IOException ex) {
// ignore
LOG.debug("Caught exception when loading configuration: {}: moving on...", ex.getMessage());
} finally {
// Cannot delete the file here because addDTSiteResource which eventually calls Configuration.reloadConfiguration
// does not actually reload the configuration. The file is actually read later and it needs to exist.
//
//if (targetGlobalFile != null) {
//targetGlobalFile.delete();
//}
}
//Validate loggers-level settings
String loggersLevel = conf.get(StramUtils.DT_LOGGERS_LEVEL);
if (loggersLevel != null) {
String[] targets = loggersLevel.split(",");
Preconditions.checkArgument(targets.length > 0, "zero loggers level");
for (String target : targets) {
String[] parts = target.split(":");
Preconditions.checkArgument(parts.length == 2, "incorrect " + target);
Preconditions.checkArgument(ConfigValidator.validateLoggersLevel(parts[0], parts[1]), "incorrect " + target);
}
}
convertDeprecatedProperties(conf);
//
// The ridiculous default RESOURCEMANAGER_CONNECT_MAX_WAIT_MS from hadoop is 15 minutes (!!!!), which actually translates to 20 minutes with the connect interval.
// That means if there is anything wrong with YARN or if YARN is not running, the caller has to wait for up to 20 minutes until it gets an error.
// We are overriding this to be 10 seconds maximum.
//
long rmConnectMaxWait = conf.getLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
if (rmConnectMaxWait > RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE) {
LOG.info("Overriding {} assigned value of {} to {} because the assigned value is too big.", YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, rmConnectMaxWait, RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE);
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE);
long rmConnectRetryInterval = conf.getLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
long defaultRetryInterval = Math.max(500, RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE / 5);
if (rmConnectRetryInterval > defaultRetryInterval) {
LOG.info("Overriding {} assigned value of {} to {} because the assigned value is too big.", YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, rmConnectRetryInterval, defaultRetryInterval);
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, defaultRetryInterval);
}
}
LOG.info(" conf object in stramclient {}", conf);
return conf;
}
public static void addDTLocalResources(Configuration conf)
{
conf.addResource(DT_DEFAULT_XML_FILE);
if (!isDevelopmentMode()) {
addDTSiteResources(conf, new File(StramClientUtils.getConfigDir(), StramClientUtils.DT_SITE_XML_FILE));
}
addDTSiteResources(conf, new File(StramClientUtils.getUserDTDirectory(), StramClientUtils.DT_SITE_XML_FILE));
}
private static Configuration addDTSiteResources(Configuration conf, File confFile)
{
if (confFile.exists()) {
LOG.info("Loading settings: " + confFile.toURI());
conf.addResource(new Path(confFile.toURI()));
} else {
LOG.info("Configuration file {} is not found. Skipping...", confFile.toURI());
}
return conf;
}
@SuppressWarnings("deprecation")
private static void convertDeprecatedProperties(Configuration conf)
{
Iterator<Map.Entry<String, String>> iterator = conf.iterator();
Map<String, String> newEntries = new HashMap<>();
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
if (entry.getKey().startsWith("stram.")) {
String newKey = StreamingApplication.DT_PREFIX + entry.getKey().substring(6);
LOG.warn("Configuration property {} is deprecated. Please use {} instead.", entry.getKey(), newKey);
newEntries.put(newKey, entry.getValue());
iterator.remove();
}
}
for (Map.Entry<String, String> entry : newEntries.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}
public static URL getDTSiteXmlFile()
{
File cfgResource = new File(StramClientUtils.getConfigDir(), StramClientUtils.DT_SITE_XML_FILE);
try {
return cfgResource.toURI().toURL();
} catch (MalformedURLException ex) {
throw new RuntimeException(ex);
}
}
public static FileSystem newFileSystemInstance(Configuration conf) throws IOException
{
String dfsRootDir = conf.get(DT_DFS_ROOT_DIR);
if (StringUtils.isBlank(dfsRootDir)) {
return FileSystem.newInstance(conf);
} else {
if (dfsRootDir.contains(DT_DFS_USER_NAME)) {
dfsRootDir = dfsRootDir.replace(DT_DFS_USER_NAME, UserGroupInformation.getLoginUser().getShortUserName());
conf.set(DT_DFS_ROOT_DIR, dfsRootDir);
}
try {
return FileSystem.newInstance(new URI(dfsRootDir), conf);
} catch (URISyntaxException ex) {
LOG.warn("{} is not a valid URI. Returning the default filesystem", dfsRootDir, ex);
return FileSystem.newInstance(conf);
}
}
}
/**
* Helper function used by both getApexDFSRootDir and getDTDFSRootDir to process dfsRootDir
*
* @param fs FileSystem object for HDFS file system
* @param conf Configuration object
* @param dfsRootDir value of dt.dfsRootDir or apex.app.dfsRootDir
* @param userShortName current user short name (either login user or current user depending on impersonation settings)
* @param prependHomeDir prepend user's home dir if dfsRootDir is relative path
* @return
*/
private static Path evalDFSRootDir(FileSystem fs, Configuration conf, String dfsRootDir, String userShortName,
boolean prependHomeDir)
{
try {
if (userShortName != null && dfsRootDir.contains(DT_DFS_USER_NAME)) {
dfsRootDir = dfsRootDir.replace(DT_DFS_USER_NAME, userShortName);
conf.set(DT_DFS_ROOT_DIR, dfsRootDir);
}
URI uri = new URI(dfsRootDir);
if (uri.isAbsolute()) {
return new Path(uri);
}
if (userShortName != null && prependHomeDir && dfsRootDir.startsWith("/") == false) {
dfsRootDir = "/user/" + userShortName + "/" + dfsRootDir;
}
} catch (URISyntaxException ex) {
LOG.warn("{} is not a valid URI. Using the default filesystem to construct the path", dfsRootDir, ex);
}
return new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), dfsRootDir);
}
private static String getDefaultRootFolder()
{
return "datatorrent";
}
/**
* This gets the DFS Root dir to be used at runtime by Apex applications as per the following logic:
* Value of apex.app.dfsRootDirectory is referred to as Apex-root-dir below.
* A "user" refers to either impersonating or impersonated user:
* If apex.application.path.impersonated is true then use impersonated user else impersonating user.
*
* <ul>
* <li> if Apex-root-dir is blank then just call getDTDFSRootDir to get the old behavior
* <li> if Apex-root-dir value has %USER_NAME% in the string then replace it with the user's name, else use the absolute path as is.
* <li> if Apex-root-dir value is a relative path then append it to the user's home directory.
* </ul>
*
* @param fs FileSystem object for HDFS file system
* @param conf Configuration object
* @return
* @throws IOException
*/
public static Path getApexDFSRootDir(FileSystem fs, Configuration conf)
{
String apexDfsRootDir = conf.get(APEX_APP_DFS_ROOT_DIR);
boolean useImpersonated = conf.getBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false);
String userShortName = null;
if (useImpersonated) {
try {
userShortName = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ex) {
LOG.warn("Error getting current/login user name {}", apexDfsRootDir, ex);
}
}
if (!useImpersonated || userShortName == null) {
return getDTDFSRootDir(fs, conf);
}
if (StringUtils.isBlank(apexDfsRootDir)) {
apexDfsRootDir = getDefaultRootFolder();
}
return evalDFSRootDir(fs, conf, apexDfsRootDir, userShortName, true);
}
public static Path getDTDFSRootDir(FileSystem fs, Configuration conf)
{
String dfsRootDir = conf.get(DT_DFS_ROOT_DIR);
if (StringUtils.isBlank(dfsRootDir)) {
return new Path(fs.getHomeDirectory(), getDefaultRootFolder());
}
String userShortName = null;
try {
userShortName = UserGroupInformation.getLoginUser().getShortUserName();
} catch (IOException ex) {
LOG.warn("Error getting user login name {}", dfsRootDir, ex);
}
return evalDFSRootDir(fs, conf, dfsRootDir, userShortName, false);
}
public static Path getDTDFSConfigDir(FileSystem fs, Configuration conf)
{
return new Path(getDTDFSRootDir(fs, conf), SUBDIR_CONF);
}
public static Path getDTDFSProfilesDir(FileSystem fs, Configuration conf)
{
return new Path(getDTDFSRootDir(fs, conf), SUBDIR_PROFILES);
}
/**
* Change DT environment variable in the env file.
* Calling this will require a restart for the new setting to take place
*
* @param key
* @param value
* @throws IOException
*/
public static void changeDTEnvironment(String key, String value) throws IOException
{
if (isDevelopmentMode()) {
throw new IllegalStateException("Cannot change DT environment in development mode.");
}
URL resource = StramClientUtils.class.getClassLoader().getResource(CUSTOM_ENV_SH_FILE);
if (resource == null) {
File envFile = new File(StramClientUtils.getUserDTDirectory(), StramClientUtils.CUSTOM_ENV_SH_FILE);
try (FileOutputStream out = new FileOutputStream(envFile)) {
out.write(("export " + key + "=\"" + value + "\"\n").getBytes());
}
} else {
try {
File cfgResource = new File(resource.toURI());
synchronized (StramClientUtils.class) {
StringBuilder sb = new StringBuilder(1024);
try (BufferedReader br = new BufferedReader(new FileReader(cfgResource))) {
String line;
boolean changed = false;
while ((line = br.readLine()) != null) {
try {
line = line.trim();
if (line.startsWith("#")) {
continue;
}
if (line.matches("export\\s+" + key + "=.*")) {
line = "export " + key + "=\"" + value + "\"";
changed = true;
}
} finally {
sb.append(line).append("\n");
}
}
if (!changed) {
sb.append("export ").append(key).append("=\"").append(value).append("\"\n");
}
}
if (sb.length() > 0) {
try (FileOutputStream out = new FileOutputStream(cfgResource)) {
out.write(sb.toString().getBytes());
}
}
}
} catch (URISyntaxException ex) {
LOG.error("Caught exception when getting env resource:", ex);
}
}
}
public static void copyFromLocalFileNoChecksum(FileSystem fs, File fromLocal, Path toDFS) throws IOException
{
// This is to void the hadoop FileSystem API to perform checksum on the local file
// This "feature" has caused a lot of headache because the local file can be copied from HDFS and modified,
// and the checksum will fail if the file is again copied to HDFS
try {
new File(fromLocal.getParentFile(), "." + fromLocal.getName() + ".crc").delete();
} catch (Exception ex) {
// ignore
}
fs.copyFromLocalFile(new Path(fromLocal.toURI()), toDFS);
}
public static boolean configComplete(Configuration conf)
{
String configStatus = conf.get(StramClientUtils.DT_CONFIG_STATUS);
return "complete".equals(configStatus);
}
public static void evalProperties(Properties target, Configuration vars)
{
ScriptEngine engine = new ScriptEngineManager().getEngineByName("javascript");
Pattern substitutionPattern = Pattern.compile("\\$\\{(.+?)\\}");
Pattern evalPattern = Pattern.compile("\\{% (.+?) %\\}");
try {
engine.eval("var _prop = {}");
for (Map.Entry<String, String> entry : vars) {
String evalString = String.format("_prop[\"%s\"] = \"%s\"", StringEscapeUtils.escapeJava(entry.getKey()), StringEscapeUtils.escapeJava(entry.getValue()));
engine.eval(evalString);
}
} catch (ScriptException ex) {
LOG.warn("Javascript error: {}", ex.getMessage());
}
for (Map.Entry<Object, Object> entry : target.entrySet()) {
String value = entry.getValue().toString();
Matcher matcher = substitutionPattern.matcher(value);
if (matcher.find()) {
StringBuilder newValue = new StringBuilder();
int cursor = 0;
do {
newValue.append(value.substring(cursor, matcher.start()));
String subst = vars.get(matcher.group(1));
if (subst != null) {
newValue.append(subst);
}
cursor = matcher.end();
} while (matcher.find());
newValue.append(value.substring(cursor));
target.put(entry.getKey(), newValue.toString());
}
matcher = evalPattern.matcher(value);
if (matcher.find()) {
StringBuilder newValue = new StringBuilder();
int cursor = 0;
do {
newValue.append(value.substring(cursor, matcher.start()));
try {
Object result = engine.eval(matcher.group(1));
String eval = result.toString();
if (eval != null) {
newValue.append(eval);
}
} catch (ScriptException ex) {
LOG.warn("JavaScript exception {}", ex.getMessage());
}
cursor = matcher.end();
} while (matcher.find());
newValue.append(value.substring(cursor));
target.put(entry.getKey(), newValue.toString());
}
}
}
public static void evalConfiguration(Configuration conf)
{
Properties props = new Properties();
for (Map.Entry entry : conf) {
props.put(entry.getKey(), entry.getValue());
}
evalProperties(props, conf);
for (Map.Entry<Object, Object> entry : props.entrySet()) {
conf.set((String)entry.getKey(), (String)entry.getValue());
}
}
public static <T> T doAs(String userName, PrivilegedExceptionAction<T> action) throws Exception
{
if (StringUtils.isNotBlank(userName) && !userName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
LOG.info("Executing command as {}", userName);
UserGroupInformation ugi = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
return ugi.doAs(action);
} else {
LOG.info("Executing command as if there is no login info: {}", userName);
return action.run();
}
}
public static ApplicationReport getStartedAppInstanceByName(YarnClient clientRMService, String appName, String user, String excludeAppId) throws YarnException, IOException
{
List<ApplicationReport> applications = clientRMService.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED), EnumSet.of(YarnApplicationState.RUNNING,
YarnApplicationState.ACCEPTED,
YarnApplicationState.NEW,
YarnApplicationState.NEW_SAVING,
YarnApplicationState.SUBMITTED));
// see whether there is an app with the app name and user name running
for (ApplicationReport app : applications) {
if (!app.getApplicationId().toString().equals(excludeAppId)
&& app.getName().equals(appName)
&& app.getUser().equals(user)) {
return app;
}
}
return null;
}
/**
* Return a YarnConfiguration instance from a Configuration instance
* @param conf The configuration instance
* @return The YarnConfiguration instance
*/
private static YarnConfiguration getYarnConfiguration(Configuration conf)
{
YarnConfiguration yarnConf;
if (conf instanceof YarnConfiguration) {
yarnConf = (YarnConfiguration)conf;
} else {
yarnConf = new YarnConfiguration(conf);
}
return yarnConf;
}
public static InetSocketAddress getRMWebAddress(Configuration conf, String rmId)
{
boolean sslEnabled = conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
return getRMWebAddress(conf, sslEnabled, rmId);
}
/**
* Get the RM webapp address. The configuration that is passed in should not be used by other threads while this
* method is executing.
* @param conf The configuration
* @param sslEnabled Whether SSL is enabled or not
* @param rmId If HA is enabled the resource manager id
* @return The webapp socket address
*/
public static InetSocketAddress getRMWebAddress(Configuration conf, boolean sslEnabled, String rmId)
{
boolean isHA = (rmId != null);
if (isHA) {
conf = getYarnConfiguration(conf);
conf.set(ConfigUtils.RM_HA_ID, rmId);
}
InetSocketAddress address;
if (sslEnabled) {
address = conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
} else {
address = conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
}
if (isHA) {
conf.unset(ConfigUtils.RM_HA_ID);
}
LOG.info("rm webapp address setting {}", address);
LOG.debug("rm setting sources {}", conf.getPropertySources(YarnConfiguration.RM_WEBAPP_ADDRESS));
InetSocketAddress resolvedSocketAddress = NetUtils.getConnectAddress(address);
InetAddress resolved = resolvedSocketAddress.getAddress();
if (resolved == null || resolved.isAnyLocalAddress() || resolved.isLoopbackAddress()) {
try {
resolvedSocketAddress = InetSocketAddress.createUnresolved(InetAddress.getLocalHost().getCanonicalHostName(), address.getPort());
} catch (UnknownHostException e) {
//Ignore and fallback.
}
}
return resolvedSocketAddress;
}
public static String getSocketConnectString(InetSocketAddress socketAddress)
{
String host;
InetAddress address = socketAddress.getAddress();
if (address == null) {
host = socketAddress.getHostString();
} else if (address.isAnyLocalAddress() || address.isLoopbackAddress()) {
host = address.getCanonicalHostName();
} else {
host = address.getHostName();
}
return host + ":" + socketAddress.getPort();
}
public static List<InetSocketAddress> getRMAddresses(Configuration conf)
{
List<InetSocketAddress> rmAddresses = new ArrayList<>();
if (ConfigUtils.isRMHAEnabled(conf)) {
// HA is enabled get all
for (String rmId : ConfigUtils.getRMHAIds(conf)) {
InetSocketAddress socketAddress = getRMWebAddress(conf, rmId);
rmAddresses.add(socketAddress);
}
} else {
InetSocketAddress socketAddress = getRMWebAddress(conf, null);
rmAddresses.add(socketAddress);
}
return rmAddresses;
}
public static List<ApplicationReport> cleanAppDirectories(YarnClient clientRMService, Configuration conf, FileSystem fs, long finishedBefore)
throws IOException, YarnException
{
List<ApplicationReport> result = new ArrayList<>();
List<ApplicationReport> applications = clientRMService.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED),
EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.FINISHED, YarnApplicationState.KILLED));
Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS);
for (ApplicationReport ar : applications) {
long finishTime = ar.getFinishTime();
if (finishTime < finishedBefore) {
try {
Path appPath = new Path(appsBasePath, ar.getApplicationId().toString());
if (fs.isDirectory(appPath)) {
LOG.debug("Deleting finished application data for {}", ar.getApplicationId());
fs.delete(appPath, true);
result.add(ar);
}
} catch (Exception ex) {
LOG.warn("Cannot delete application data for {}", ar.getApplicationId(), ex);
continue;
}
}
}
return result;
}
public static AppPackage.AppInfo jsonFileToAppInfo(File file, Configuration config)
{
AppPackage.AppInfo appInfo = null;
try {
StramAppLauncher.AppFactory appFactory = new StramAppLauncher.JsonFileAppFactory(file);
StramAppLauncher stramAppLauncher = new StramAppLauncher(file.getName(), config);
stramAppLauncher.loadDependencies();
appInfo = new AppPackage.AppInfo(appFactory.getName(), file.getName(), "json");
appInfo.displayName = appFactory.getDisplayName();
try {
appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration());
appInfo.dag.validate();
} catch (Exception ex) {
appInfo.error = ex.getMessage();
appInfo.errorStackTrace = ExceptionUtils.getStackTrace(ex);
}
} catch (Exception ex) {
LOG.error("Caught exceptions trying to process {}", file.getName(), ex);
}
return appInfo;
}
public static void addAttributeToArgs(Attribute<String> attribute, Context context, List<CharSequence> vargs)
{
String value = context.getValue(attribute);
if (value != null) {
vargs.add(String.format("-D%s=$'%s'", attribute.getLongName(),
value.replace("\\", "\\\\\\\\").replaceAll("['\"$]", "\\\\$0")));
}
}
}