blob: dce648bbd821342b8bf57924f8d987d46b12982c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrSubstitutor;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.security.StramDelegationTokenIdentifier;
import com.datatorrent.stram.security.StramDelegationTokenManager;
/**
*
* Runnable to connect to the {@link StreamingContainerManager} and launch the container that will host streaming operators<p>
* <br>
*
* @since 0.3.2
*/
public class LaunchContainerRunnable implements Runnable
{
private static final Logger LOG = LoggerFactory.getLogger(LaunchContainerRunnable.class);
private static final String JAVA_REMOTE_DEBUG_OPTS = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n";
private final Map<String, String> containerEnv = new HashMap<>();
private final LogicalPlan dag;
private final ByteBuffer tokens;
private final Container container;
private final NMClientAsync nmClient;
private final StreamingContainerAgent sca;
private static final int MB_TO_B = 1024 * 1024;
/**
* @param lcontainer Allocated container
* @param nmClient
* @param sca
* @param tokens
*/
public LaunchContainerRunnable(Container lcontainer, NMClientAsync nmClient, StreamingContainerAgent sca, ByteBuffer tokens)
{
this.container = lcontainer;
this.nmClient = nmClient;
this.dag = sca.getContainer().getPlan().getLogicalPlan();
this.tokens = tokens;
this.sca = sca;
}
private void setClasspath(Map<String, String> env)
{
// add localized application jar files to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder("./*");
String classpath = nmClient.getConfig().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
for (String c : StringUtils.isBlank(classpath) ? YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH : classpath.split(",")) {
if (c.equals("$HADOOP_CLIENT_CONF_DIR")) {
// SPOI-2501
continue;
}
classPathEnv.append(':');
classPathEnv.append(c.trim());
}
classPathEnv.append(":."); // include log4j.properties, if any
env.put("CLASSPATH", classPathEnv.toString());
LOG.info("CLASSPATH: {}", classPathEnv);
}
public static void addFileToLocalResources(final String name, final FileStatus fileStatus, final LocalResourceType type, final Map<String, LocalResource> localResources)
{
final LocalResource localResource = LocalResource.newInstance(ConverterUtils.getYarnUrlFromPath(fileStatus.getPath()),
type, LocalResourceVisibility.APPLICATION, fileStatus.getLen(), fileStatus.getModificationTime());
localResources.put(name, localResource);
}
public static void addFilesToLocalResources(LocalResourceType type, String commaSeparatedFileNames, Map<String, LocalResource> localResources, FileSystem fs) throws IOException
{
String[] files = StringUtils.splitByWholeSeparator(commaSeparatedFileNames, StramClient.LIB_JARS_SEP);
for (String file : files) {
final Path dst = new Path(file);
addFileToLocalResources(dst.getName(), fs.getFileStatus(dst), type, localResources);
}
}
/**
* Connects to CM, sets up container launch context and eventually dispatches the container start request to the CM.
*/
@Override
public void run()
{
LOG.info("Setting up container launch context for containerid={}", container.getId());
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
setClasspath(containerEnv);
try {
// propagate to replace node managers user name (effective in non-secure mode)
containerEnv.put("HADOOP_USER_NAME", UserGroupInformation.getLoginUser().getUserName());
} catch (Exception e) {
LOG.error("Failed to retrieve principal name", e);
}
// Set the environment
ctx.setEnvironment(containerEnv);
ctx.setTokens(tokens);
// Set the local resources
Map<String, LocalResource> localResources = new HashMap<>();
// add resources for child VM
try {
// child VM dependencies
try (FileSystem fs = StramClientUtils.newFileSystemInstance(nmClient.getConfig())) {
addFilesToLocalResources(LocalResourceType.FILE, dag.getAttributes().get(Context.DAGContext.LIBRARY_JARS), localResources, fs);
String archives = dag.getAttributes().get(LogicalPlan.ARCHIVES);
if (archives != null) {
addFilesToLocalResources(LocalResourceType.ARCHIVE, archives, localResources, fs);
}
ctx.setLocalResources(localResources);
}
} catch (IOException e) {
LOG.error("Failed to prepare local resources.", e);
return;
}
// Set the necessary command to execute on the allocated container
List<CharSequence> vargs = getChildVMCommand(container.getId().toString());
// Get final command
StringBuilder command = new StringBuilder(1024);
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
LOG.info("Launching on node: {} command: {}", container.getNodeId(), command);
List<String> commands = new ArrayList<>();
commands.add(command.toString());
ctx.setCommands(commands);
nmClient.startContainerAsync(container, ctx);
}
/**
* Build the command to launch the child VM in the container
*
* @param jvmID
* @return
*/
public List<CharSequence> getChildVMCommand(String jvmID)
{
List<CharSequence> vargs = new ArrayList<>(8);
if (!StringUtils.isBlank(System.getenv(Environment.JAVA_HOME.key()))) {
// node manager provides JAVA_HOME
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
} else {
vargs.add("java");
}
String jvmOpts = dag.getAttributes().get(LogicalPlan.CONTAINER_JVM_OPTIONS);
if (jvmOpts == null) {
if (dag.isDebug()) {
vargs.add(JAVA_REMOTE_DEBUG_OPTS);
}
} else {
Map<String, String> params = new HashMap<>();
params.put("applicationId", Integer.toString(container.getId().getApplicationAttemptId().getApplicationId().getId()));
params.put("containerId", Integer.toString(container.getId().getId()));
StrSubstitutor sub = new StrSubstitutor(params, "%(", ")");
vargs.add(sub.replace(jvmOpts));
if (dag.isDebug() && !jvmOpts.contains("-agentlib:jdwp=")) {
vargs.add(JAVA_REMOTE_DEBUG_OPTS);
}
}
String loggerAppender = dag.getValue(LogicalPlan.LOGGER_APPENDER);
if (loggerAppender != null) {
vargs.add(String.format("-D%s=\"%s\"", LogicalPlan.LOGGER_APPENDER.getLongName(), loggerAppender));
}
List<DAG.OperatorMeta> operatorMetaList = Lists.newArrayList();
int bufferServerMemory = 0;
for (PTOperator operator : sca.getContainer().getOperators()) {
bufferServerMemory += operator.getBufferServerMemory();
operatorMetaList.add(operator.getOperatorMeta());
}
Context.ContainerOptConfigurator containerOptConfigurator = dag.getAttributes().get(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR);
jvmOpts = containerOptConfigurator.getJVMOptions(operatorMetaList);
jvmOpts = parseJvmOpts(jvmOpts, ((long)bufferServerMemory) * MB_TO_B);
LOG.info("Jvm opts {} for container {}",jvmOpts,container.getId());
vargs.add(jvmOpts);
Path childTmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add(String.format("-D%s=%s", StreamingContainer.PROP_APP_PATH, dag.assertAppPath()));
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
vargs.add(String.format("-D%scid=%s", StreamingApplication.DT_PREFIX, jvmID));
vargs.add("-Dhadoop.root.logger=" + (dag.isDebug() ? "DEBUG" : "INFO") + ",RFA");
vargs.add("-Dhadoop.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
String loggersLevel = System.getProperty(StramUtils.DT_LOGGERS_LEVEL);
if (loggersLevel != null) {
vargs.add(String.format("-D%s=%s", StramUtils.DT_LOGGERS_LEVEL, loggersLevel));
}
// Add main class and its arguments
vargs.add(StreamingContainer.class.getName()); // main of Child
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Final commmand
StringBuilder mergedCommand = new StringBuilder(256);
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
List<CharSequence> vargsFinal = new ArrayList<>(1);
vargsFinal.add(mergedCommand.toString());
return vargsFinal;
}
private String parseJvmOpts(String jvmOpts, long memory)
{
String xmx = "-Xmx";
StringBuilder builder = new StringBuilder();
if (jvmOpts != null && jvmOpts.length() > 1) {
String[] splits = jvmOpts.split("(\\s+)");
boolean foundProperty = false;
for (String split : splits) {
if (split.startsWith(xmx)) {
foundProperty = true;
long heapSize = Long.valueOf(split.substring(xmx.length()));
heapSize += memory;
builder.append(xmx).append(heapSize).append(" ");
} else {
builder.append(split).append(" ");
}
}
if (!foundProperty) {
builder.append(xmx).append(memory);
}
}
return builder.toString();
}
public static ByteBuffer getTokens(StramDelegationTokenManager delegationTokenManager, InetSocketAddress heartbeatAddress) throws IOException
{
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
StramDelegationTokenIdentifier identifier = new StramDelegationTokenIdentifier(new Text(ugi.getUserName()), new Text(""), new Text(""));
String service = heartbeatAddress.getAddress().getHostAddress() + ":" + heartbeatAddress.getPort();
Token<StramDelegationTokenIdentifier> stramToken = new Token<>(identifier, delegationTokenManager);
stramToken.setService(new Text(service));
return getTokens(ugi, stramToken);
}
return null;
}
public static ByteBuffer getTokens(UserGroupInformation ugi, Token<StramDelegationTokenIdentifier> delegationToken)
{
try {
Collection<Token<? extends TokenIdentifier>> tokens = ugi.getCredentials().getAllTokens();
Credentials credentials = new Credentials();
for (Token<? extends TokenIdentifier> token : tokens) {
if (!token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
credentials.addToken(token.getService(), token);
LOG.debug("Passing container token {}", token);
}
}
credentials.addToken(delegationToken.getService(), delegationToken);
DataOutputBuffer dataOutput = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dataOutput);
byte[] tokenBytes = dataOutput.getData();
ByteBuffer cTokenBuf = ByteBuffer.wrap(tokenBytes);
return cTokenBuf.duplicate();
} catch (IOException e) {
throw new RuntimeException("Error generating delegation token", e);
}
}
}