blob: 7eec786042d9b0de080de08636df3d0bafdaa7a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.yarn.client;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.reef.runtime.common.REEFLauncher;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser;
import org.apache.reef.runtime.yarn.util.YarnTypes;
import java.io.IOException;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Helper code that wraps the YARN Client API for our purposes.
*/
public final class YarnSubmissionHelper implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName());
private final YarnClient yarnClient;
private final GetNewApplicationResponse applicationResponse;
private final ApplicationSubmissionContext applicationSubmissionContext;
private final ApplicationId applicationId;
private final Map<String, LocalResource> resources = new HashMap<>();
private final ClasspathProvider classpath;
private final YarnProxyUser yarnProxyUser;
private final SecurityTokenProvider tokenProvider;
private final boolean isUnmanaged;
private final List<String> commandPrefixList;
private String driverStdoutFilePath;
private String driverStderrFilePath;
private Class launcherClazz = REEFLauncher.class;
private List<String> configurationFilePaths;
private final Map<String, String> environmentVariablesMap = new HashMap<>();
public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
final YarnProxyUser yarnProxyUser,
final SecurityTokenProvider tokenProvider,
final boolean isUnmanaged,
final List<String> commandPrefixList) throws IOException, YarnException {
this.classpath = classpath;
this.yarnProxyUser = yarnProxyUser;
this.isUnmanaged = isUnmanaged;
this.driverStdoutFilePath =
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStdoutFileName();
this.driverStderrFilePath =
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStderrFileName();
LOG.log(Level.FINE, "Initializing YARN Client");
this.yarnClient = YarnClient.createYarnClient();
this.yarnClient.init(yarnConfiguration);
this.yarnClient.start();
LOG.log(Level.FINE, "Initialized YARN Client");
LOG.log(Level.FINE, "Requesting Application ID from YARN.");
final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
this.applicationResponse = yarnClientApplication.getNewApplicationResponse();
this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
this.applicationSubmissionContext.setUnmanagedAM(isUnmanaged);
this.applicationId = this.applicationSubmissionContext.getApplicationId();
this.tokenProvider = tokenProvider;
this.commandPrefixList = commandPrefixList;
this.configurationFilePaths = Collections.singletonList(fileNames.getDriverConfigurationPath());
LOG.log(Level.INFO, "YARN Application ID: {0}", this.applicationId);
}
public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
final YarnProxyUser yarnProxyUser,
final SecurityTokenProvider tokenProvider,
final boolean isUnmanaged) throws IOException, YarnException {
this(yarnConfiguration, fileNames, classpath, yarnProxyUser, tokenProvider, isUnmanaged, null);
}
/**
*
* @return the application ID assigned by YARN.
*/
public int getApplicationId() {
return this.applicationId.getId();
}
/**
*
* @return the application ID string representation assigned by YARN.
*/
public String getStringApplicationId() {
return this.applicationId.toString();
}
/**
* Set the name of the application to be submitted.
* @param applicationName
* @return
*/
public YarnSubmissionHelper setApplicationName(final String applicationName) {
applicationSubmissionContext.setApplicationName(applicationName);
return this;
}
/**
* Set the amount of memory to be allocated to the Driver.
* @param megabytes
* @return
*/
public YarnSubmissionHelper setDriverMemory(final int megabytes) {
applicationSubmissionContext.setResource(Resource.newInstance(getMemory(megabytes), 1));
return this;
}
/**
* Add a file to be localized on the driver.
* @param resourceName
* @param resource
* @return
*/
public YarnSubmissionHelper addLocalResource(final String resourceName, final LocalResource resource) {
resources.put(resourceName, resource);
return this;
}
/**
* Set the priority of the job.
* @param priority
* @return
*/
public YarnSubmissionHelper setPriority(final int priority) {
this.applicationSubmissionContext.setPriority(Priority.newInstance(priority));
return this;
}
/**
* Set whether or not the resource manager should preserve evaluators across driver restarts.
* @param preserveEvaluators
* @return
*/
public YarnSubmissionHelper setPreserveEvaluators(final boolean preserveEvaluators) {
if (preserveEvaluators) {
// when supported, set KeepContainersAcrossApplicationAttempts to be true
// so that when driver (AM) crashes, evaluators will still be running and we can recover later.
if (YarnTypes.isAtOrAfterVersion(YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE)) {
LOG.log(
Level.FINE,
"Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported," +
" will set it to true.",
YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE);
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
} else {
LOG.log(Level.WARNING,
"Hadoop version does not yet support KeepContainersAcrossApplicationAttempts. Driver restarts " +
"will not support recovering evaluators.");
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false);
}
} else {
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false);
}
return this;
}
/**
* Sets the maximum application attempts for the application.
* @param maxApplicationAttempts
* @return
*/
public YarnSubmissionHelper setMaxApplicationAttempts(final int maxApplicationAttempts) {
applicationSubmissionContext.setMaxAppAttempts(maxApplicationAttempts);
return this;
}
/**
* Assign this job submission to a queue.
* @param queueName
* @return
*/
public YarnSubmissionHelper setQueue(final String queueName) {
this.applicationSubmissionContext.setQueue(queueName);
return this;
}
/**
* Sets the launcher class for the job.
* @param launcherClass
* @return
*/
public YarnSubmissionHelper setLauncherClass(final Class launcherClass) {
this.launcherClazz = launcherClass;
return this;
}
/**
* Sets the configuration file for the job.
* Note that this does not have to be Driver TANG configuration. In the bootstrap
* launch case, this can be the set of the Avro files that supports the generation of a driver
* configuration file natively at the Launcher.
* @param configurationFilePaths
* @return
*/
public YarnSubmissionHelper setConfigurationFilePaths(final List<String> configurationFilePaths) {
this.configurationFilePaths = configurationFilePaths;
return this;
}
/**
* Sets environment variable map.
* @param map
* @return
*/
public YarnSubmissionHelper setJobSubmissionEnvMap(final Map<String, String> map) {
for (final Map.Entry<String, String> entry : map.entrySet()) {
environmentVariablesMap.put(entry.getKey(), entry.getValue());
}
return this;
}
/**
* Adds a job submission environment variable.
* @param key
* @param value
* @return
*/
public YarnSubmissionHelper setJobSubmissionEnvVariable(final String key, final String value) {
environmentVariablesMap.put(key, value);
return this;
}
/**
* Sets the Driver stdout file path.
* @param driverStdoutPath
* @return
*/
public YarnSubmissionHelper setDriverStdoutPath(final String driverStdoutPath) {
this.driverStdoutFilePath = driverStdoutPath;
return this;
}
/**
* Sets the Driver stderr file path.
* @param driverStderrPath
* @return
*/
public YarnSubmissionHelper setDriverStderrPath(final String driverStderrPath) {
this.driverStderrFilePath = driverStderrPath;
return this;
}
public void submit() throws IOException, YarnException {
// SET EXEC COMMAND
final List<String> launchCommand = new JavaLaunchCommandBuilder(launcherClazz, commandPrefixList)
.setConfigurationFilePaths(configurationFilePaths)
.setClassPath(this.classpath.getDriverClasspath())
.setMemory(this.applicationSubmissionContext.getResource().getMemory())
.setStandardOut(driverStdoutFilePath)
.setStandardErr(driverStderrFilePath)
.build();
if (this.applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts() &&
this.applicationSubmissionContext.getMaxAppAttempts() == 1) {
LOG.log(Level.WARNING, "Application will not be restarted even though preserve evaluators is set to true" +
" since the max application submissions is 1. Proceeding to submit application...");
}
final ContainerLaunchContext containerLaunchContext = YarnTypes.getContainerLaunchContext(
launchCommand, this.resources, tokenProvider.getTokens(), environmentVariablesMap);
this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}, driver core: {1}",
new Object[] {this.applicationId, this.applicationSubmissionContext.getResource().getVirtualCores()});
if (LOG.isLoggable(Level.INFO)) {
LOG.log(Level.INFO, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));
}
this.yarnClient.submitApplication(applicationSubmissionContext);
if (this.isUnmanaged) {
// For Unmanaged AM mode, add a new app token to the
// current process so it can talk to the RM as an AM.
final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId);
this.yarnProxyUser.set("reef-proxy", UserGroupInformation.getCurrentUser(), token);
this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token));
}
}
/**
* Extract the desired driver memory from jobSubmissionProto.
* <p>
* returns maxMemory if that desired amount is more than maxMemory
*/
private int getMemory(final int requestedMemory) {
final int maxMemory = applicationResponse.getMaximumResourceCapability().getMemory();
final int amMemory;
if (requestedMemory <= maxMemory) {
amMemory = requestedMemory;
} else {
LOG.log(Level.WARNING,
"Requested {0}MB of memory for the driver. " +
"The max on this YARN installation is {1}. " +
"Using {1} as the memory for the driver.",
new Object[]{requestedMemory, maxMemory});
amMemory = maxMemory;
}
return amMemory;
}
@Override
public void close() {
LOG.log(Level.FINE, "Closing YARN application: {0}", this.applicationId);
this.yarnClient.stop(); // same as yarnClient.close()
}
}