blob: 453619b97290e93718fc51c92abab275f3110c37 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.apache.hadoop.yarn.service.containerlaunch.ClasspathConstructor;
import org.apache.hadoop.yarn.service.containerlaunch.JavaCommandLineBuilder;
import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
import org.apache.hadoop.yarn.service.exceptions.BadConfigException;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.utils.ZookeeperUtils;
import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.*;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
import static org.apache.hadoop.yarn.service.utils.ServiceUtils.*;
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ServiceClient extends AppAdminClient implements SliderExitCodes,
YarnServiceConstants {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceClient.class);
private SliderFileSystem fs;
//TODO disable retry so that client / rest API doesn't block?
protected YarnClient yarnClient;
// Avoid looking up applicationId from fs all the time.
private Map<String, AppInfo> cachedAppInfo = new ConcurrentHashMap<>();
private RegistryOperations registryClient;
private CuratorFramework curatorClient;
private YarnRPC rpc;
private static EnumSet<YarnApplicationState> terminatedStates =
EnumSet.of(FINISHED, FAILED, KILLED);
private static EnumSet<YarnApplicationState> liveStates =
EnumSet.of(NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING);
private static EnumSet<YarnApplicationState> preRunningStates =
EnumSet.of(NEW, NEW_SAVING, SUBMITTED, ACCEPTED);
@Override protected void serviceInit(Configuration configuration)
throws Exception {
fs = new SliderFileSystem(configuration);
yarnClient = YarnClient.createYarnClient();
rpc = YarnRPC.create(configuration);
addService(yarnClient);
super.serviceInit(configuration);
}
@Override
protected void serviceStop() throws Exception {
if (registryClient != null) {
registryClient.stop();
}
super.serviceStop();
}
public Service loadAppJsonFromLocalFS(String fileName, String serviceName,
Long lifetime, String queue) throws IOException, YarnException {
File file = new File(fileName);
if (!file.exists() && fileName.equals(file.getName())) {
String examplesDirStr = System.getenv("YARN_SERVICE_EXAMPLES_DIR");
String[] examplesDirs;
if (examplesDirStr == null) {
String yarnHome = System
.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
examplesDirs = new String[]{
yarnHome + "/share/hadoop/yarn/yarn-service-examples",
yarnHome + "/yarn-service-examples"
};
} else {
examplesDirs = StringUtils.split(examplesDirStr, ":");
}
for (String dir : examplesDirs) {
file = new File(MessageFormat.format("{0}/{1}/{2}.json",
dir, fileName, fileName));
if (file.exists()) {
break;
}
// Then look for secondary location.
file = new File(MessageFormat.format("{0}/{1}.json",
dir, fileName));
if (file.exists()) {
break;
}
}
}
if (!file.exists()) {
throw new YarnException("File or example could not be found: " +
fileName);
}
Path filePath = new Path(file.getAbsolutePath());
LOG.info("Loading service definition from local FS: " + filePath);
Service service = jsonSerDeser
.load(FileSystem.getLocal(getConfig()), filePath);
if (!StringUtils.isEmpty(serviceName)) {
service.setName(serviceName);
}
if (lifetime != null && lifetime > 0) {
service.setLifetime(lifetime);
}
if (!StringUtils.isEmpty(queue)) {
service.setQueue(queue);
}
return service;
}
@Override
public int actionSave(String fileName, String serviceName, Long lifetime,
String queue) throws IOException, YarnException {
return actionBuild(loadAppJsonFromLocalFS(fileName, serviceName,
lifetime, queue));
}
public int actionBuild(Service service)
throws YarnException, IOException {
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
Path appDir = checkAppNotExistOnHdfs(service, false);
ServiceApiUtil.createDirAndPersistApp(fs, appDir, service);
return EXIT_SUCCESS;
}
@Override
public int actionUpgrade(String appName, String fileName)
throws IOException, YarnException {
checkAppExistOnHdfs(appName);
Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
null, null);
return actionUpgrade(upgradeService);
}
public int actionUpgrade(Service service) throws YarnException, IOException {
Service persistedService =
ServiceApiUtil.loadService(fs, service.getName());
if (!StringUtils.isEmpty(persistedService.getId())) {
cachedAppInfo.put(persistedService.getName(), new AppInfo(
ApplicationId.fromString(persistedService.getId()),
persistedService.getKerberosPrincipal().getPrincipalName()));
}
if (persistedService.getVersion().equals(service.getVersion())) {
String message =
service.getName() + " is already at version " + service.getVersion()
+ ". There is nothing to upgrade.";
LOG.error(message);
throw new YarnException(message);
}
Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true);
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
ApplicationReport appReport =
yarnClient.getApplicationReport(getAppId(service.getName()));
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(service.getName() + " AM hostname is empty");
}
ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
UpgradeServiceRequestProto.Builder requestBuilder =
UpgradeServiceRequestProto.newBuilder();
requestBuilder.setVersion(service.getVersion());
proxy.upgrade(requestBuilder.build());
return EXIT_SUCCESS;
}
public int actionLaunch(String fileName, String serviceName, Long lifetime,
String queue) throws IOException, YarnException {
actionCreate(loadAppJsonFromLocalFS(fileName, serviceName, lifetime,
queue));
return EXIT_SUCCESS;
}
public ApplicationId actionCreate(Service service)
throws IOException, YarnException {
String serviceName = service.getName();
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
verifyNoLiveAppInRM(serviceName, "create");
Path appDir = checkAppNotExistOnHdfs(service, false);
// Write the definition first and then submit - AM will read the definition
ServiceApiUtil.createDirAndPersistApp(fs, appDir, service);
ApplicationId appId = submitApp(service);
cachedAppInfo.put(serviceName, new AppInfo(appId, service
.getKerberosPrincipal().getPrincipalName()));
service.setId(appId.toString());
// update app definition with appId
ServiceApiUtil.writeAppDefinition(fs, appDir, service);
return appId;
}
public int actionFlex(String serviceName, Map<String, String>
componentCountStrings) throws YarnException, IOException {
Map<String, Long> componentCounts =
new HashMap<>(componentCountStrings.size());
Service persistedService =
ServiceApiUtil.loadService(fs, serviceName);
if (!StringUtils.isEmpty(persistedService.getId())) {
cachedAppInfo.put(persistedService.getName(), new AppInfo(
ApplicationId.fromString(persistedService.getId()),
persistedService.getKerberosPrincipal().getPrincipalName()));
} else {
throw new YarnException(persistedService.getName()
+ " appId is null, may be not submitted to YARN yet");
}
for (Map.Entry<String, String> entry : componentCountStrings.entrySet()) {
String compName = entry.getKey();
ServiceApiUtil.validateNameFormat(compName, getConfig());
Component component = persistedService.getComponent(compName);
if (component == null) {
throw new IllegalArgumentException(entry.getKey() + " does not exist !");
}
long numberOfContainers =
parseNumberOfContainers(component, entry.getValue());
componentCounts.put(compName, numberOfContainers);
}
flexComponents(serviceName, componentCounts, persistedService);
return EXIT_SUCCESS;
}
// Parse the number of containers requested by user, e.g.
// +5 means add 5 additional containers
// -5 means reduce 5 containers, if it goes to negative, sets it to 0
// 5 means sets it to 5 containers.
private long parseNumberOfContainers(Component component, String newNumber) {
long orig = component.getNumberOfContainers();
if (newNumber.startsWith("+")) {
return orig + Long.parseLong(newNumber.substring(1));
} else if (newNumber.startsWith("-")) {
long ret = orig - Long.parseLong(newNumber.substring(1));
if (ret < 0) {
LOG.warn(MessageFormat.format(
"[COMPONENT {0}]: component count goes to negative ({1}{2} = {3}),"
+ " ignore and reset it to 0.",
component.getName(), orig, newNumber, ret));
ret = 0;
}
return ret;
} else {
return Long.parseLong(newNumber);
}
}
// Called by Rest Service
public Map<String, Long> flexByRestService(String serviceName,
Map<String, Long> componentCounts) throws YarnException, IOException {
// load app definition
Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
if (StringUtils.isEmpty(persistedService.getId())) {
throw new YarnException(
serviceName + " appId is null, may be not submitted to YARN yet");
}
cachedAppInfo.put(persistedService.getName(), new AppInfo(
ApplicationId.fromString(persistedService.getId()), persistedService
.getKerberosPrincipal().getPrincipalName()));
return flexComponents(serviceName, componentCounts, persistedService);
}
private Map<String, Long> flexComponents(String serviceName,
Map<String, Long> componentCounts, Service persistedService)
throws YarnException, IOException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
Map<String, Long> original = new HashMap<>(componentCounts.size());
ComponentCountProto.Builder countBuilder = ComponentCountProto.newBuilder();
FlexComponentsRequestProto.Builder requestBuilder =
FlexComponentsRequestProto.newBuilder();
for (Component persistedComp : persistedService.getComponents()) {
String name = persistedComp.getName();
if (componentCounts.containsKey(persistedComp.getName())) {
original.put(name, persistedComp.getNumberOfContainers());
persistedComp.setNumberOfContainers(componentCounts.get(name));
// build the request
countBuilder.setName(persistedComp.getName())
.setNumberOfContainers(persistedComp.getNumberOfContainers());
requestBuilder.addComponents(countBuilder.build());
}
}
if (original.size() < componentCounts.size()) {
componentCounts.keySet().removeAll(original.keySet());
throw new YarnException("Components " + componentCounts.keySet()
+ " do not exist in app definition.");
}
jsonSerDeser
.save(fs.getFileSystem(), ServiceApiUtil.getServiceJsonPath(fs, serviceName),
persistedService, true);
ApplicationId appId = getAppId(serviceName);
if (appId == null) {
String message = "Application ID doesn't exist for " + serviceName;
LOG.error(message);
throw new YarnException(message);
}
ApplicationReport appReport =
yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() != RUNNING) {
String message =
serviceName + " is at " + appReport.getYarnApplicationState()
+ " state, flex can only be invoked when service is running";
LOG.error(message);
throw new YarnException(message);
}
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(serviceName + " AM hostname is empty");
}
ClientAMProtocol proxy =
createAMProxy(serviceName, appReport);
proxy.flexComponents(requestBuilder.build());
for (Map.Entry<String, Long> entry : original.entrySet()) {
LOG.info("[COMPONENT {}]: number of containers changed from {} to {}",
entry.getKey(), entry.getValue(),
componentCounts.get(entry.getKey()));
}
return original;
}
@Override
public int actionStop(String serviceName)
throws YarnException, IOException {
return actionStop(serviceName, true);
}
public int actionStop(String serviceName, boolean waitForAppStopped)
throws YarnException, IOException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
ApplicationId currentAppId = getAppId(serviceName);
if (currentAppId == null) {
LOG.info("Application ID doesn't exist for service {}", serviceName);
cleanUpRegistry(serviceName);
return EXIT_COMMAND_ARGUMENT_ERROR;
}
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
if (terminatedStates.contains(report.getYarnApplicationState())) {
LOG.info("Service {} is already in a terminated state {}", serviceName,
report.getYarnApplicationState());
cleanUpRegistry(serviceName);
return EXIT_COMMAND_ARGUMENT_ERROR;
}
if (preRunningStates.contains(report.getYarnApplicationState())) {
String msg = serviceName + " is at " + report.getYarnApplicationState()
+ ", forcefully killed by user!";
yarnClient.killApplication(currentAppId, msg);
LOG.info(msg);
cleanUpRegistry(serviceName);
return EXIT_SUCCESS;
}
if (StringUtils.isEmpty(report.getHost())) {
throw new YarnException(serviceName + " AM hostname is empty");
}
LOG.info("Stopping service {}, with appId = {}", serviceName, currentAppId);
try {
ClientAMProtocol proxy =
createAMProxy(serviceName, report);
cachedAppInfo.remove(serviceName);
if (proxy != null) {
// try to stop the app gracefully.
StopRequestProto request = StopRequestProto.newBuilder().build();
proxy.stop(request);
LOG.info("Service " + serviceName + " is being gracefully stopped...");
} else {
yarnClient.killApplication(currentAppId,
serviceName + " is forcefully killed by user!");
LOG.info("Forcefully kill the service: " + serviceName);
cleanUpRegistry(serviceName);
return EXIT_SUCCESS;
}
if (!waitForAppStopped) {
cleanUpRegistry(serviceName);
return EXIT_SUCCESS;
}
// Wait until the app is killed.
long startTime = System.currentTimeMillis();
int pollCount = 0;
while (true) {
Thread.sleep(2000);
report = yarnClient.getApplicationReport(currentAppId);
if (terminatedStates.contains(report.getYarnApplicationState())) {
LOG.info("Service " + serviceName + " is stopped.");
break;
}
// Forcefully kill after 10 seconds.
if ((System.currentTimeMillis() - startTime) > 10000) {
LOG.info("Stop operation timeout stopping, forcefully kill the app "
+ serviceName);
yarnClient.killApplication(currentAppId,
"Forcefully kill the app by user");
break;
}
if (++pollCount % 10 == 0) {
LOG.info("Waiting for service " + serviceName + " to be stopped.");
}
}
} catch (IOException | YarnException | InterruptedException e) {
LOG.info("Failed to stop " + serviceName + " gracefully due to: "
+ e.getMessage() + ", forcefully kill the app.");
yarnClient.killApplication(currentAppId, "Forcefully kill the app");
}
cleanUpRegistry(serviceName);
return EXIT_SUCCESS;
}
@Override
public int actionDestroy(String serviceName) throws YarnException,
IOException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
verifyNoLiveAppInRM(serviceName, "destroy");
Path appDir = fs.buildClusterDirPath(serviceName);
FileSystem fileSystem = fs.getFileSystem();
// remove from the appId cache
cachedAppInfo.remove(serviceName);
int ret = EXIT_SUCCESS;
if (fileSystem.exists(appDir)) {
if (fileSystem.delete(appDir, true)) {
LOG.info("Successfully deleted service dir for " + serviceName + ": "
+ appDir);
} else {
String message =
"Failed to delete service + " + serviceName + " at: " + appDir;
LOG.info(message);
throw new YarnException(message);
}
} else {
LOG.info("Service '" + serviceName + "' doesn't exist at hdfs path: "
+ appDir);
ret = EXIT_NOT_FOUND;
}
try {
deleteZKNode(serviceName);
// don't set destroySucceed to false if no ZK node exists because not
// all services use a ZK node
} catch (Exception e) {
throw new IOException("Could not delete zk node for " + serviceName, e);
}
if (!cleanUpRegistry(serviceName)) {
if (ret == EXIT_SUCCESS) {
ret = EXIT_OTHER_FAILURE;
}
}
if (ret == EXIT_SUCCESS) {
LOG.info("Successfully destroyed service {}", serviceName);
return ret;
} else if (ret == EXIT_NOT_FOUND) {
LOG.error("Error on destroy '" + serviceName + "': not found.");
return ret;
} else {
LOG.error("Error on destroy '" + serviceName + "': error cleaning up " +
"registry.");
return ret;
}
}
private boolean cleanUpRegistry(String serviceName) throws SliderException {
String registryPath =
ServiceRegistryUtils.registryPathForInstance(serviceName);
try {
if (getRegistryClient().exists(registryPath)) {
getRegistryClient().delete(registryPath, true);
} else {
LOG.info(
"Service '" + serviceName + "' doesn't exist at ZK registry path: "
+ registryPath);
// not counted as a failure if the registry entries don't exist
}
} catch (IOException e) {
LOG.warn("Error deleting registry entry {}", registryPath, e);
return false;
}
return true;
}
private synchronized RegistryOperations getRegistryClient()
throws SliderException, IOException {
if (registryClient == null) {
registryClient =
RegistryOperationsFactory.createInstance("ServiceClient", getConfig());
registryClient.init(getConfig());
registryClient.start();
}
return registryClient;
}
/**
* Delete service's ZK node. This is a different node from the service's
* registry entry and is set aside for the service to use for its own ZK data.
*
* @param serviceName service name
* @return true if the node was deleted, false if the node doesn't exist
* @throws Exception if the node couldn't be deleted
*/
private boolean deleteZKNode(String serviceName) throws Exception {
CuratorFramework curatorFramework = getCuratorClient();
String user = RegistryUtils.currentUser();
String zkPath = ServiceRegistryUtils.mkServiceHomePath(user, serviceName);
if (curatorFramework.checkExists().forPath(zkPath) != null) {
curatorFramework.delete().deletingChildrenIfNeeded().forPath(zkPath);
LOG.info("Deleted zookeeper path: " + zkPath);
return true;
} else {
LOG.info(
"Service '" + serviceName + "' doesn't exist at ZK path: " + zkPath);
return false;
}
}
private synchronized CuratorFramework getCuratorClient()
throws BadConfigException {
String registryQuorum =
getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
// though if neither is set: trouble
if (ServiceUtils.isUnset(registryQuorum)) {
throw new BadConfigException(
"No Zookeeper quorum provided in the" + " configuration property "
+ RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
}
ZookeeperUtils.splitToHostsAndPortsStrictly(registryQuorum);
if (curatorClient == null) {
curatorClient =
CuratorFrameworkFactory.builder().connectString(registryQuorum)
.sessionTimeoutMs(10000).retryPolicy(new RetryNTimes(5, 2000))
.build();
curatorClient.start();
}
return curatorClient;
}
private void verifyNoLiveAppInRM(String serviceName, String action)
throws IOException, YarnException {
Set<String> types = new HashSet<>(1);
types.add(YarnServiceConstants.APP_TYPE);
Set<String> tags = null;
if (serviceName != null) {
tags = Collections.singleton(ServiceUtils.createNameTag(serviceName));
}
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
request.setApplicationTypes(types);
request.setApplicationTags(tags);
request.setApplicationStates(liveStates);
String user = UserGroupInformation.getCurrentUser().getUserName();
if (user != null) {
request.setUsers(Collections.singleton(user));
}
List<ApplicationReport> reports = yarnClient.getApplications(request);
if (!reports.isEmpty()) {
String message = "";
if (action.equals("destroy")) {
message = "Failed to destroy service " + serviceName
+ ", because it is still running.";
} else {
message = "Failed to " + action + " service " + serviceName
+ ", because it already exists.";
}
throw new YarnException(message);
}
}
@VisibleForTesting
ApplicationId submitApp(Service app) throws IOException, YarnException {
String serviceName = app.getName();
Configuration conf = getConfig();
Path appRootDir = fs.buildClusterDirPath(app.getName());
YarnClientApplication yarnApp = yarnClient.createApplication();
ApplicationSubmissionContext submissionContext =
yarnApp.getApplicationSubmissionContext();
ServiceApiUtil.validateCompResourceSize(
yarnApp.getNewApplicationResponse().getMaximumResourceCapability(),
app);
submissionContext.setKeepContainersAcrossApplicationAttempts(true);
if (app.getLifetime() > 0) {
Map<ApplicationTimeoutType, Long> appTimeout = new HashMap<>();
appTimeout.put(ApplicationTimeoutType.LIFETIME, app.getLifetime());
submissionContext.setApplicationTimeouts(appTimeout);
}
submissionContext.setMaxAppAttempts(YarnServiceConf
.getInt(YarnServiceConf.AM_RESTART_MAX, DEFAULT_AM_RESTART_MAX, app
.getConfiguration(), conf));
setLogAggregationContext(app, conf, submissionContext);
Map<String, LocalResource> localResources = new HashMap<>();
// copy local slideram-log4j.properties to hdfs and add to localResources
boolean hasAMLog4j =
addAMLog4jResource(serviceName, conf, localResources);
// copy jars to hdfs and add to localResources
addJarResource(serviceName, localResources);
// add keytab if in secure env
addKeytabResourceIfSecure(fs, localResources, app);
if (LOG.isDebugEnabled()) {
printLocalResources(localResources);
}
Map<String, String> env = addAMEnv();
// create AM CLI
String cmdStr = buildCommandLine(app, conf, appRootDir, hasAMLog4j);
submissionContext.setResource(Resource.newInstance(YarnServiceConf
.getLong(YarnServiceConf.AM_RESOURCE_MEM,
YarnServiceConf.DEFAULT_KEY_AM_RESOURCE_MEM, app.getConfiguration(),
conf), 1));
String queue = app.getQueue();
if (StringUtils.isEmpty(queue)) {
queue = conf.get(YARN_QUEUE, DEFAULT_YARN_QUEUE);
}
submissionContext.setQueue(queue);
submissionContext.setApplicationName(serviceName);
submissionContext.setApplicationType(YarnServiceConstants.APP_TYPE);
Set<String> appTags =
AbstractClientProvider.createApplicationTags(serviceName, null, null);
if (!appTags.isEmpty()) {
submissionContext.setApplicationTags(appTags);
}
ContainerLaunchContext amLaunchContext =
Records.newRecord(ContainerLaunchContext.class);
amLaunchContext.setCommands(Collections.singletonList(cmdStr));
amLaunchContext.setEnvironment(env);
amLaunchContext.setLocalResources(localResources);
addCredentials(amLaunchContext, app);
submissionContext.setAMContainerSpec(amLaunchContext);
yarnClient.submitApplication(submissionContext);
return submissionContext.getApplicationId();
}
private void setLogAggregationContext(Service app, Configuration conf,
ApplicationSubmissionContext submissionContext) {
LogAggregationContext context = Records.newRecord(LogAggregationContext
.class);
String finalLogInclude = YarnServiceConf.get
(FINAL_LOG_INCLUSION_PATTERN, null, app.getConfiguration(), conf);
if (!StringUtils.isEmpty(finalLogInclude)) {
context.setIncludePattern(finalLogInclude);
}
String finalLogExclude = YarnServiceConf.get
(FINAL_LOG_EXCLUSION_PATTERN, null, app.getConfiguration(), conf);
if (!StringUtils.isEmpty(finalLogExclude)) {
context.setExcludePattern(finalLogExclude);
}
String rollingLogInclude = YarnServiceConf.get
(ROLLING_LOG_INCLUSION_PATTERN, null, app.getConfiguration(), conf);
if (!StringUtils.isEmpty(rollingLogInclude)) {
context.setRolledLogsIncludePattern(rollingLogInclude);
}
String rollingLogExclude = YarnServiceConf.get
(ROLLING_LOG_EXCLUSION_PATTERN, null, app.getConfiguration(), conf);
if (!StringUtils.isEmpty(rollingLogExclude)) {
context.setRolledLogsExcludePattern(rollingLogExclude);
}
submissionContext.setLogAggregationContext(context);
}
private void printLocalResources(Map<String, LocalResource> map) {
LOG.debug("Added LocalResource for localization: ");
StringBuilder builder = new StringBuilder();
for (Map.Entry<String, LocalResource> entry : map.entrySet()) {
builder.append(entry.getKey()).append(" -> ")
.append(entry.getValue().getResource().getFile())
.append(System.lineSeparator());
}
LOG.debug(builder.toString());
}
private String buildCommandLine(Service app, Configuration conf,
Path appRootDir, boolean hasSliderAMLog4j) throws BadConfigException {
JavaCommandLineBuilder CLI = new JavaCommandLineBuilder();
CLI.forceIPv4().headless();
String jvmOpts = YarnServiceConf
.get(YarnServiceConf.JVM_OPTS, "", app.getConfiguration(), conf);
if (!jvmOpts.contains("-Xmx")) {
jvmOpts += DEFAULT_AM_JVM_XMX;
}
CLI.setJVMOpts(jvmOpts);
if (hasSliderAMLog4j) {
CLI.sysprop(SYSPROP_LOG4J_CONFIGURATION, YARN_SERVICE_LOG4J_FILENAME);
CLI.sysprop(SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
}
CLI.add(ServiceMaster.class.getCanonicalName());
//TODO debugAM CLI.add(Arguments.ARG_DEBUG)
CLI.add("-" + ServiceMaster.YARNFILE_OPTION, new Path(appRootDir,
app.getName() + ".json"));
// pass the registry binding
CLI.addConfOptionToCLI(conf, RegistryConstants.KEY_REGISTRY_ZK_ROOT,
RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
CLI.addMandatoryConfOption(conf, RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
// write out the path output
CLI.addOutAndErrFiles(STDOUT_AM, STDERR_AM);
String cmdStr = CLI.build();
LOG.debug("AM launch command: {}", cmdStr);
return cmdStr;
}
private Map<String, String> addAMEnv() throws IOException {
Map<String, String> env = new HashMap<>();
ClasspathConstructor classpath =
buildClasspath(YarnServiceConstants.SUBMITTED_CONF_DIR, "lib", fs, getConfig()
.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false));
env.put("CLASSPATH", classpath.buildClasspath());
env.put("LANG", "en_US.UTF-8");
env.put("LC_ALL", "en_US.UTF-8");
env.put("LANGUAGE", "en_US.UTF-8");
String jaas = System.getenv("HADOOP_JAAS_DEBUG");
if (jaas != null) {
env.put("HADOOP_JAAS_DEBUG", jaas);
}
if (!UserGroupInformation.isSecurityEnabled()) {
String userName = UserGroupInformation.getCurrentUser().getUserName();
LOG.debug("Run as user " + userName);
// HADOOP_USER_NAME env is used by UserGroupInformation when log in
// This env makes AM run as this user
env.put("HADOOP_USER_NAME", userName);
}
LOG.debug("AM env: \n{}", stringifyMap(env));
return env;
}
protected Path addJarResource(String serviceName,
Map<String, LocalResource> localResources)
throws IOException, SliderException {
Path libPath = fs.buildClusterDirPath(serviceName);
ProviderUtils
.addProviderJar(localResources, ServiceMaster.class, SERVICE_CORE_JAR, fs,
libPath, "lib", false);
Path dependencyLibTarGzip = fs.getDependencyTarGzip();
if (fs.isFile(dependencyLibTarGzip)) {
LOG.info("Loading lib tar from " + dependencyLibTarGzip);
fs.submitTarGzipAndUpdate(localResources);
} else {
if (dependencyLibTarGzip != null) {
LOG.warn("Property {} has a value {}, but is not a valid file",
YarnServiceConf.DEPENDENCY_TARBALL_PATH, dependencyLibTarGzip);
}
String[] libs = ServiceUtils.getLibDirs();
LOG.info("Uploading all dependency jars to HDFS. For faster submission of"
+ " apps, set config property {} to the dependency tarball location."
+ " Dependency tarball can be uploaded to any HDFS path directly"
+ " or by using command: yarn app -{} [<Destination Folder>]",
YarnServiceConf.DEPENDENCY_TARBALL_PATH,
ApplicationCLI.ENABLE_FAST_LAUNCH);
for (String libDirProp : libs) {
ProviderUtils.addAllDependencyJars(localResources, fs, libPath, "lib",
libDirProp);
}
}
return libPath;
}
private boolean addAMLog4jResource(String serviceName, Configuration conf,
Map<String, LocalResource> localResources)
throws IOException, BadClusterStateException {
boolean hasAMLog4j = false;
String hadoopConfDir =
System.getenv(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
if (hadoopConfDir != null) {
File localFile =
new File(hadoopConfDir, YarnServiceConstants.YARN_SERVICE_LOG4J_FILENAME);
if (localFile.exists()) {
Path localFilePath = createLocalPath(localFile);
Path appDirPath = fs.buildClusterDirPath(serviceName);
Path remoteConfPath =
new Path(appDirPath, YarnServiceConstants.SUBMITTED_CONF_DIR);
Path remoteFilePath =
new Path(remoteConfPath, YarnServiceConstants.YARN_SERVICE_LOG4J_FILENAME);
copy(conf, localFilePath, remoteFilePath);
LocalResource localResource =
fs.createAmResource(remoteConfPath, LocalResourceType.FILE);
localResources.put(localFilePath.getName(), localResource);
hasAMLog4j = true;
} else {
LOG.warn("AM log4j property file doesn't exist: " + localFile);
}
}
return hasAMLog4j;
}
@Override
public int actionStart(String serviceName) throws YarnException, IOException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
Service liveService = getStatus(serviceName);
if (liveService == null ||
!liveService.getState().equals(ServiceState.UPGRADING)) {
Path appDir = checkAppExistOnHdfs(serviceName);
Service service = ServiceApiUtil.loadService(fs, serviceName);
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
// see if it is actually running and bail out;
verifyNoLiveAppInRM(serviceName, "start");
ApplicationId appId = submitApp(service);
service.setId(appId.toString());
// write app definition on to hdfs
Path appJson = ServiceApiUtil.writeAppDefinition(fs, appDir, service);
LOG.info("Persisted service " + service.getName() + " at " + appJson);
return 0;
} else {
LOG.info("Finalize service {} upgrade");
ApplicationReport appReport =
yarnClient.getApplicationReport(getAppId(serviceName));
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(serviceName + " AM hostname is empty");
}
ClientAMProtocol proxy = createAMProxy(serviceName, appReport);
RestartServiceRequestProto.Builder requestBuilder =
RestartServiceRequestProto.newBuilder();
proxy.restart(requestBuilder.build());
return 0;
}
}
/**
* Verifies that the service definition does not exist on hdfs.
*
* @param service service
* @param isUpgrade true for upgrades; false otherwise
* @return path to the service definition..
* @throws IOException
* @throws SliderException
*/
private Path checkAppNotExistOnHdfs(Service service, boolean isUpgrade)
throws IOException, SliderException {
Path appDir = !isUpgrade ? fs.buildClusterDirPath(service.getName()) :
fs.buildClusterUpgradeDirPath(service.getName(), service.getVersion());
fs.verifyDirectoryNonexistent(
new Path(appDir, service.getName() + ".json"));
return appDir;
}
/**
* Verifies that the service exists on hdfs.
* @param serviceName service name
* @return path to the service definition.
* @throws IOException
* @throws SliderException
*/
private Path checkAppExistOnHdfs(String serviceName)
throws IOException, SliderException {
Path appDir = fs.buildClusterDirPath(serviceName);
fs.verifyPathExists(new Path(appDir, serviceName + ".json"));
return appDir;
}
private void addCredentials(ContainerLaunchContext amContext, Service app)
throws IOException {
Credentials allCreds = new Credentials();
// HDFS DT
if (UserGroupInformation.isSecurityEnabled()) {
String tokenRenewer = YarnClientUtils.getRmPrincipal(getConfig());
if (StringUtils.isEmpty(tokenRenewer)) {
throw new IOException(
"Can't get Master Kerberos principal for the RM to use as renewer");
}
final org.apache.hadoop.security.token.Token<?>[] tokens =
fs.getFileSystem().addDelegationTokens(tokenRenewer, allCreds);
if (LOG.isDebugEnabled()) {
if (tokens != null && tokens.length != 0) {
for (Token<?> token : tokens) {
LOG.debug("Got DT: " + token);
}
}
}
}
if (!StringUtils.isEmpty(app.getDockerClientConfig())) {
allCreds.addAll(DockerClientConfigHandler.readCredentialsFromConfigFile(
new Path(app.getDockerClientConfig()), getConfig(), app.getName()));
}
if (allCreds.numberOfTokens() > 0) {
DataOutputBuffer dob = new DataOutputBuffer();
allCreds.writeTokenStorageToStream(dob);
ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContext.setTokens(tokens);
}
}
private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
Map<String, LocalResource> localResource, Service service)
throws IOException, YarnException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
String principalName = service.getKerberosPrincipal().getPrincipalName();
if (StringUtils.isEmpty(principalName)) {
LOG.warn("No Kerberos principal name specified for " + service.getName());
return;
}
if(StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
LOG.warn("No Kerberos keytab specified for " + service.getName());
return;
}
URI keytabURI;
try {
keytabURI = new URI(service.getKerberosPrincipal().getKeytab());
} catch (URISyntaxException e) {
throw new YarnException(e);
}
switch (keytabURI.getScheme()) {
case "hdfs":
Path keytabOnhdfs = new Path(keytabURI);
if (!fileSystem.getFileSystem().exists(keytabOnhdfs)) {
LOG.warn(service.getName() + "'s keytab (principalName = " +
principalName + ") doesn't exist at: " + keytabOnhdfs);
return;
}
LocalResource keytabRes =
fileSystem.createAmResource(keytabOnhdfs, LocalResourceType.FILE);
localResource.put(String.format(YarnServiceConstants.KEYTAB_LOCATION,
service.getName()), keytabRes);
LOG.debug("Adding " + service.getName() + "'s keytab for " +
"localization, uri = " + keytabOnhdfs);
break;
case "file":
LOG.debug("Using a keytab from localhost: " + keytabURI);
break;
default:
LOG.warn("Unsupported URI scheme " + keytabURI);
break;
}
}
public String updateLifetime(String serviceName, long lifetime)
throws YarnException, IOException {
ApplicationId currentAppId = getAppId(serviceName);
if (currentAppId == null) {
throw new YarnException("Application ID not found for " + serviceName);
}
ApplicationReport report = yarnClient.getApplicationReport(currentAppId);
if (report == null) {
throw new YarnException("Service not found for " + serviceName);
}
ApplicationId appId = report.getApplicationId();
LOG.info("Updating lifetime of an service: serviceName = " + serviceName
+ ", appId = " + appId + ", lifetime = " + lifetime);
Map<ApplicationTimeoutType, String> map = new HashMap<>();
String newTimeout =
Times.formatISO8601(System.currentTimeMillis() + lifetime * 1000);
map.put(ApplicationTimeoutType.LIFETIME, newTimeout);
UpdateApplicationTimeoutsRequest request =
UpdateApplicationTimeoutsRequest.newInstance(appId, map);
yarnClient.updateApplicationTimeouts(request);
LOG.info(
"Successfully updated lifetime for an service: serviceName = " + serviceName
+ ", appId = " + appId + ". New expiry time in ISO8601 format is "
+ newTimeout);
return newTimeout;
}
public ServiceState convertState(YarnApplicationState state) {
switch (state) {
case NEW:
case NEW_SAVING:
case SUBMITTED:
case ACCEPTED:
return ServiceState.ACCEPTED;
case RUNNING:
return ServiceState.STARTED;
case FINISHED:
case KILLED:
return ServiceState.STOPPED;
case FAILED:
return ServiceState.FAILED;
default:
return ServiceState.ACCEPTED;
}
}
@Override
public String getStatusString(String appIdOrName)
throws IOException, YarnException {
try {
// try parsing appIdOrName, if it succeeds, it means it's appId
ApplicationId appId = ApplicationId.fromString(appIdOrName);
return getStatusByAppId(appId);
} catch (IllegalArgumentException e) {
// not appId format, it could be appName.
Service status = getStatus(appIdOrName);
return ServiceApiUtil.jsonSerDeser.toJson(status);
}
}
private String getStatusByAppId(ApplicationId appId)
throws IOException, YarnException {
ApplicationReport appReport =
yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() != RUNNING) {
return "";
}
if (StringUtils.isEmpty(appReport.getHost())) {
return "";
}
ClientAMProtocol amProxy = createAMProxy(appReport.getName(), appReport);
GetStatusResponseProto response =
amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
return response.getStatus();
}
public Service getStatus(String serviceName)
throws IOException, YarnException {
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
Service appSpec = new Service();
appSpec.setName(serviceName);
ApplicationId currentAppId = getAppId(serviceName);
if (currentAppId == null) {
LOG.info("Service {} does not have an application ID", serviceName);
return appSpec;
}
ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
appSpec.setState(convertState(appReport.getYarnApplicationState()));
ApplicationTimeout lifetime =
appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
if (lifetime != null) {
appSpec.setLifetime(lifetime.getRemainingTime());
}
if (appReport.getYarnApplicationState() != RUNNING) {
LOG.info("Service {} is at {} state", serviceName,
appReport.getYarnApplicationState());
return appSpec;
}
if (StringUtils.isEmpty(appReport.getHost())) {
LOG.warn(serviceName + " AM hostname is empty");
return appSpec;
}
ClientAMProtocol amProxy =
createAMProxy(serviceName, appReport);
GetStatusResponseProto response =
amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
appSpec = jsonSerDeser.fromJson(response.getStatus());
if (lifetime != null) {
appSpec.setLifetime(lifetime.getRemainingTime());
}
return appSpec;
}
public YarnClient getYarnClient() {
return this.yarnClient;
}
public int enableFastLaunch(String destinationFolder)
throws IOException, YarnException {
return actionDependency(destinationFolder, true);
}
public int actionDependency(String destinationFolder, boolean overwrite)
throws IOException, YarnException {
String currentUser = RegistryUtils.currentUser();
LOG.info("Running command as user {}", currentUser);
if (destinationFolder == null) {
destinationFolder = String.format(YarnServiceConstants.DEPENDENCY_DIR,
VersionInfo.getVersion());
}
Path dependencyLibTarGzip = new Path(destinationFolder,
YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME
+ YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT);
// Check if dependency has already been uploaded, in which case log
// appropriately and exit success (unless overwrite has been requested)
if (fs.isFile(dependencyLibTarGzip) && !overwrite) {
System.out.println(String.format(
"Dependency libs are already uploaded to %s.", dependencyLibTarGzip
.toUri()));
return EXIT_SUCCESS;
}
String[] libDirs = ServiceUtils.getLibDirs();
if (libDirs.length > 0) {
File tempLibTarGzipFile = File.createTempFile(
YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME + "_",
YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT);
// copy all jars
tarGzipFolder(libDirs, tempLibTarGzipFile, createJarFilter());
LOG.info("Version Info: " + VersionInfo.getBuildVersion());
fs.copyLocalFileToHdfs(tempLibTarGzipFile, dependencyLibTarGzip,
new FsPermission(YarnServiceConstants.DEPENDENCY_DIR_PERMISSIONS));
LOG.info("To let apps use this tarball, in yarn-site set config property "
+ "{} to {}", YarnServiceConf.DEPENDENCY_TARBALL_PATH,
dependencyLibTarGzip);
return EXIT_SUCCESS;
} else {
return EXIT_FALSE;
}
}
protected ClientAMProtocol createAMProxy(String serviceName,
ApplicationReport appReport) throws IOException, YarnException {
if (UserGroupInformation.isSecurityEnabled()) {
if (!cachedAppInfo.containsKey(serviceName)) {
Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
cachedAppInfo.put(serviceName, new AppInfo(appReport.getApplicationId(),
persistedService.getKerberosPrincipal().getPrincipalName()));
}
String principalName = cachedAppInfo.get(serviceName).principalName;
// Inject the principal into hadoop conf, because Hadoop
// SaslRpcClient#getServerPrincipal requires a config for the
// principal
if (!StringUtils.isEmpty(principalName)) {
getConfig().set(PRINCIPAL, principalName);
} else {
throw new YarnException("No principal specified in the persisted " +
"service definition, fail to connect to AM.");
}
}
InetSocketAddress address =
NetUtils.createSocketAddrForHost(appReport.getHost(), appReport
.getRpcPort());
return ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
UserGroupInformation.getCurrentUser(), rpc, address);
}
@VisibleForTesting
void setFileSystem(SliderFileSystem fileSystem)
throws IOException {
this.fs = fileSystem;
}
@VisibleForTesting
void setYarnClient(YarnClient yarnClient) {
this.yarnClient = yarnClient;
}
public synchronized ApplicationId getAppId(String serviceName)
throws IOException, YarnException {
if (cachedAppInfo.containsKey(serviceName)) {
return cachedAppInfo.get(serviceName).appId;
}
Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
if (persistedService == null) {
throw new YarnException("Service " + serviceName
+ " doesn't exist on hdfs. Please check if the app exists in RM");
}
if (persistedService.getId() == null) {
return null;
}
ApplicationId currentAppId = ApplicationId.fromString(persistedService
.getId());
cachedAppInfo.put(serviceName, new AppInfo(currentAppId, persistedService
.getKerberosPrincipal().getPrincipalName()));
return currentAppId;
}
private static class AppInfo {
ApplicationId appId;
String principalName;
AppInfo(ApplicationId appId, String principalName) {
this.appId = appId;
this.principalName = principalName;
}
}
}