blob: cdcf2d11c8c7be2e3770522e4429a268ea8f5885 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.job.yarn;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.samza.clustermanager.SamzaContainerLaunchException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* A Helper class to run container processes on Yarn. This encapsulates quite a bit of YarnContainer
* boiler plate.
*/
public class YarnContainerRunner {
private static final Logger log = LoggerFactory.getLogger(YarnContainerRunner.class);
private final Config config;
private final YarnConfiguration yarnConfiguration;
private final NMClient nmClient;
private final YarnConfig yarnConfig;
/**
* Create a new Runner from a Config.
* @param config to instantiate the runner with
* @param yarnConfiguration the yarn config for the cluster to connect to.
*/
public YarnContainerRunner(Config config,
YarnConfiguration yarnConfiguration) {
this.config = config;
this.yarnConfiguration = yarnConfiguration;
this.nmClient = NMClient.createNMClient();
nmClient.init(this.yarnConfiguration);
this.yarnConfig = new YarnConfig(config);
}
/**
* Runs a process as specified by the command builder on the container.
* @param samzaContainerId id of the samza Container to run (passed as a command line parameter to the process)
* @param container the samza container to run.
* @param cmdBuilder the command builder that encapsulates the command, and the context
*
* @throws SamzaContainerLaunchException when there's an exception in submitting the request to the RM.
*
*/
public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws SamzaContainerLaunchException {
String containerIdStr = ConverterUtils.toString(container.getId());
log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
// check if we have framework path specified. If yes - use it, if not use default ./__package/
String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries
String cmdPath = "./__package/";
String fwkPath = JobConfig.getFwkPath(config);
if(fwkPath != null && (! fwkPath.isEmpty())) {
cmdPath = fwkPath;
jobLib = "export JOB_LIB_DIR=./__package/lib";
}
log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib);
cmdBuilder.setCommandPath(cmdPath);
String command = cmdBuilder.buildCommand();
log.info("Container ID {} using command {}", samzaContainerId, command);
Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
printContainerEnvironmentVariables(samzaContainerId, env);
log.info("Samza FWK path: " + command + "; env=" + env);
Path packagePath = new Path(yarnConfig.getPackagePath());
log.info("Starting container ID {} using package path {}", samzaContainerId, packagePath);
startContainer(
packagePath,
container,
env,
getFormattedCommand(
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
jobLib,
command,
ApplicationConstants.STDOUT,
ApplicationConstants.STDERR)
);
log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).",
new Object[]{
samzaContainerId,
containerIdStr,
container.getNodeId().getHost(),
container.getNodeHttpAddress(),
containerIdStr}
);
log.info("Started container ID {}", samzaContainerId);
}
/**
* Runs a command as a process on the container. All binaries needed by the physical process are packaged in the URL
* specified by packagePath.
*/
private void startContainer(Path packagePath,
Container container,
Map<String, String> env,
final String cmd) throws SamzaContainerLaunchException {
log.info("starting container {} {} {} {}",
new Object[]{packagePath, container, env, cmd});
// TODO: SAMZA-1144 remove the customized approach for package resource and use the common one.
// But keep it now for backward compatibility.
// set the local package so that the containers and app master are provisioned with it
LocalResource packageResource = Records.newRecord(LocalResource.class);
URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
FileStatus fileStatus;
try {
fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
} catch (IOException ioe) {
log.error("IO Exception when accessing the package status from the filesystem", ioe);
throw new SamzaContainerLaunchException("IO Exception when accessing the package status from the filesystem");
}
packageResource.setResource(packageUrl);
log.info("set package Resource in YarnContainerRunner for {}", packageUrl);
packageResource.setSize(fileStatus.getLen());
packageResource.setTimestamp(fileStatus.getModificationTime());
packageResource.setType(LocalResourceType.ARCHIVE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
ByteBuffer allTokens;
// copy tokens (copied from dist shell example)
try {
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
// now remove the AM->RM token so that containers cannot access it
Iterator iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
TokenIdentifier token = ((Token) iter.next()).decodeIdentifier();
if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
} catch (IOException ioe) {
log.error("IOException when writing credentials.", ioe);
throw new SamzaContainerLaunchException("IO Exception when writing credentials to output buffer");
}
Map<String, LocalResource> localResourceMap = new HashMap<>();
localResourceMap.put("__package", packageResource);
// include the resources from the universal resource configurations
LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
localResourceMap.putAll(resourceMapper.getResourceMap());
ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
context.setEnvironment(env);
context.setTokens(allTokens.duplicate());
context.setCommands(new ArrayList<String>() {{add(cmd);}});
context.setLocalResources(localResourceMap);
log.debug("setting localResourceMap to {}", localResourceMap);
log.debug("setting context to {}", context);
StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
startContainerRequest.setContainerLaunchContext(context);
try {
nmClient.startContainer(container, context);
} catch (YarnException ye) {
log.error("Received YarnException when starting container: " + container.getId(), ye);
throw new SamzaContainerLaunchException("Received YarnException when starting container: " + container.getId(), ye);
} catch (IOException ioe) {
log.error("Received IOException when starting container: " + container.getId(), ioe);
throw new SamzaContainerLaunchException("Received IOException when starting container: " + container.getId(), ioe);
}
}
/**
* @param samzaContainerId the Samza container Id for logging purposes.
* @param env the Map of environment variables to their respective values.
*/
private void printContainerEnvironmentVariables(String samzaContainerId, Map<String, String> env) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : env.entrySet()) {
sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
}
log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString());
}
/**
* Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
*
* @param cmdBuilder the command builder containing the environment variables.
* @return the map containing the escaped environment variables.
*/
private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
Map<String, String> env = new HashMap<String, String>();
for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) {
String escapedValue = Util.envVarEscape(entry.getValue());
env.put(entry.getKey(), escapedValue);
}
return env;
}
private String getFormattedCommand(String logDirExpansionVar,
String jobLib,
String command,
String stdOut,
String stdErr) {
if (!jobLib.isEmpty()) {
jobLib = "&& " + jobLib; // add job's libraries exported to an env variable
}
return String
.format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar,
jobLib, logDirExpansionVar, command, stdOut, stdErr);
}
}