blob: 8229c63f8997ccba80564a3174a3c2c1e550293c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.core.launch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.slider.client.SliderYarnClientImpl;
import org.apache.slider.common.tools.CoreFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
public class AppMasterLauncher extends AbstractLauncher {
private static final Logger log =
LoggerFactory.getLogger(AppMasterLauncher.class);
protected final YarnClientApplication application;
private final String name;
private final String type;
private final ApplicationSubmissionContext submissionContext;
private final ApplicationId appId;
private final boolean secureCluster;
private int maxAppAttempts = 0;
private boolean keepContainersOverRestarts = true;
private String queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
private int priority = 1;
private final Resource resource = Records.newRecord(Resource.class);
private final SliderYarnClientImpl yarnClient;
/**
* Build the AM Launcher
* @param name app name
* @param type applicatin type
* @param conf hadoop config
* @param fs filesystem binding
* @param application precreated YARN client app instance
* @param secureCluster is the cluster secure?
* @param options map of options. All values are extracted in this constructor only
* -the map is not retained.
*/
public AppMasterLauncher(String name,
String type,
Configuration conf,
CoreFileSystem fs,
SliderYarnClientImpl yarnClient,
boolean secureCluster,
Map<String, String> options,
Set<String> applicationTags
) throws IOException, YarnException {
super(conf, fs);
this.yarnClient = yarnClient;
this.application = yarnClient.createApplication();
this.name = name;
this.type = type;
this.secureCluster = secureCluster;
submissionContext = application.getApplicationSubmissionContext();
appId = submissionContext.getApplicationId();
// set the application name;
submissionContext.setApplicationName(name);
// app type used in service enum;
submissionContext.setApplicationType(type);
if (!applicationTags.isEmpty()) {
submissionContext.setApplicationTags(applicationTags);
}
extractResourceRequirements(resource, options);
}
public void setMaxAppAttempts(int maxAppAttempts) {
this.maxAppAttempts = maxAppAttempts;
}
public void setKeepContainersOverRestarts(boolean keepContainersOverRestarts) {
this.keepContainersOverRestarts = keepContainersOverRestarts;
}
public Resource getResource() {
return resource;
}
public void setMemory(int memory) {
resource.setMemory(memory);
}
public void setVirtualCores(int cores) {
resource.setVirtualCores(cores);
}
public ApplicationId getApplicationId() {
return appId;
}
public int getMaxAppAttempts() {
return maxAppAttempts;
}
public boolean isKeepContainersOverRestarts() {
return keepContainersOverRestarts;
}
public String getQueue() {
return queue;
}
public int getPriority() {
return priority;
}
public void setQueue(String queue) {
this.queue = queue;
}
public void setPriority(int priority) {
this.priority = priority;
}
/**
* Complete the launch context (copy in env vars, etc).
* @return the container to launch
*/
public ApplicationSubmissionContext completeAppMasterLaunch() throws
IOException {
//queue priority
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(priority);
submissionContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
// Queue for App master
submissionContext.setQueue(queue);
//container requirements
submissionContext.setResource(resource);
if (keepContainersOverRestarts) {
log.debug("Requesting cluster stays running over AM failure");
submissionContext.setKeepContainersAcrossApplicationAttempts(true);
}
if (maxAppAttempts > 0) {
log.debug("Setting max AM attempts to {}", maxAppAttempts);
submissionContext.setMaxAppAttempts(maxAppAttempts);
}
if (secureCluster) {
addSecurityTokens();
} else {
propagateUsernameInInsecureCluster();
}
completeContainerLaunch();
submissionContext.setAMContainerSpec(containerLaunchContext);
return submissionContext;
}
/**
* Add the security tokens if this is a secure cluster
* @throws IOException
*/
private void addSecurityTokens() throws IOException {
String tokenRenewer = getConf().get(YarnConfiguration.RM_PRINCIPAL);
if (SliderUtils.isUnset(tokenRenewer)) {
throw new IOException(
"Can't get Master Kerberos principal for the RM to use as renewer: "
+ YarnConfiguration.RM_PRINCIPAL
);
}
if (this.getConf().get("mapreduce.job.credentials.binary") == null) {
// For now, only getting tokens for the default file-system.
FileSystem fs = coreFileSystem.getFileSystem();
fs.addDelegationTokens(tokenRenewer, credentials);
}
}
public LaunchedApplication submitApplication() throws IOException, YarnException {
completeAppMasterLaunch();
log.info("Submitting application to Resource Manager");
ApplicationId applicationId =
yarnClient.submitApplication(submissionContext);
return new LaunchedApplication(applicationId, yarnClient);
}
}