blob: b2e76cbc04234341072d931e762dad73404abc95 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.regex.Pattern;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceFile;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.util.FSDownload;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
public class AuxServices extends AbstractService
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
public static final String NM_AUX_SERVICE_DIR = "nmAuxService";
public static final FsPermission NM_AUX_SERVICE_DIR_PERM =
new FsPermission((short) 0700);
public static final String CLASS_NAME = "class.name";
public static final String SYSTEM_CLASSES = "system.classes";
static final String STATE_STORE_ROOT_NAME = "nm-aux-services";
private static final Logger LOG =
LoggerFactory.getLogger(AuxServices.class);
private static final String DEL_SUFFIX = "_DEL_";
private final Map<String, AuxiliaryService> serviceMap;
private final Map<String, AuxServiceRecord> serviceRecordMap;
private final Map<String, ByteBuffer> serviceMetaData;
private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
private final LocalDirsHandlerService dirsHandler;
private final DeletionService delService;
private final UserGroupInformation userUGI;
private final FsPermission storeDirPerms = new FsPermission((short)0700);
private Path stateStoreRoot = null;
private FileSystem stateStoreFs = null;
private volatile boolean manifestEnabled = false;
private volatile Path manifest;
private volatile FileSystem manifestFS;
private Timer manifestReloadTimer;
private TimerTask manifestReloadTask;
private long manifestReloadInterval;
private long manifestModifyTS = -1;
private final ObjectMapper mapper;
private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler,
Context nmContext, DeletionService deletionService) {
super(AuxServices.class.getName());
serviceMap =
Collections.synchronizedMap(new HashMap<String, AuxiliaryService>());
serviceRecordMap =
Collections.synchronizedMap(new HashMap<String, AuxServiceRecord>());
serviceMetaData =
Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
this.dirsHandler = nmContext.getLocalDirsHandler();
this.delService = deletionService;
this.userUGI = getRemoteUgi();
this.mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// Obtain services from configuration in init()
}
/**
* Returns whether aux services manifest / dynamic loading is enabled.
*/
public boolean isManifestEnabled() {
return manifestEnabled;
}
/**
* Adds a service to the service map.
*
* @param name aux service name
* @param service aux service
* @param serviceRecord aux service record
*/
protected final synchronized void addService(String name,
AuxiliaryService service, AuxServiceRecord serviceRecord) {
LOG.info("Adding auxiliary service " + serviceRecord.getName() +
" version " + serviceRecord.getVersion());
serviceMap.put(name, service);
serviceRecordMap.put(name, serviceRecord);
}
Collection<AuxiliaryService> getServices() {
return Collections.unmodifiableCollection(serviceMap.values());
}
/**
* Gets current aux service records.
*
* @return a collection of service records
*/
public Collection<AuxServiceRecord> getServiceRecords() {
return Collections.unmodifiableCollection(serviceRecordMap.values());
}
/**
* @return the meta data for all registered services, that have been started.
* If a service has not been started no metadata will be available. The key
* is the name of the service as defined in the configuration.
*/
public Map<String, ByteBuffer> getMetaData() {
Map<String, ByteBuffer> metaClone = new HashMap<>(serviceMetaData.size());
synchronized (serviceMetaData) {
for (Entry<String, ByteBuffer> entry : serviceMetaData.entrySet()) {
metaClone.put(entry.getKey(), entry.getValue().duplicate());
}
}
return metaClone;
}
/**
* Creates an auxiliary service from a specification using the Configuration
* classloader.
*
* @param service aux service record
* @return auxiliary service
*/
private AuxiliaryService createAuxServiceFromConfiguration(AuxServiceRecord
service) {
Configuration c = new Configuration(false);
c.set(CLASS_NAME, getClassName(service));
Class<? extends AuxiliaryService> sClass = c.getClass(CLASS_NAME,
null, AuxiliaryService.class);
if (sClass == null) {
throw new YarnRuntimeException("No class defined for auxiliary " +
"service" + service.getName());
}
return ReflectionUtils.newInstance(sClass, null);
}
/**
* Creates an auxiliary service from a specification using a custom local
* classpath.
*
* @param service aux service record
* @param appLocalClassPath local class path
* @param conf configuration
* @return auxiliary service
* @throws IOException
* @throws ClassNotFoundException
*/
private AuxiliaryService createAuxServiceFromLocalClasspath(
AuxServiceRecord service, String appLocalClassPath, Configuration conf)
throws IOException, ClassNotFoundException {
Preconditions.checkArgument(appLocalClassPath != null &&
!appLocalClassPath.isEmpty(),
"local classpath was null in createAuxServiceFromLocalClasspath");
final String sName = service.getName();
final String className = getClassName(service);
if (service.getConfiguration() != null && service.getConfiguration()
.getFiles().size() > 0) {
throw new YarnRuntimeException("The aux service:" + sName
+ " has configured local classpath:" + appLocalClassPath
+ " and config files:" + service.getConfiguration().getFiles()
+ ". Only one of them should be configured.");
}
return AuxiliaryServiceWithCustomClassLoader.getInstance(conf, className,
appLocalClassPath, getSystemClasses(service, className));
}
/**
* Creates an auxiliary service from a specification.
*
* @param service aux service record
* @param conf configuration
* @param fromConfiguration true if from configuration, false if from manifest
* @return auxiliary service
* @throws IOException
* @throws ClassNotFoundException
*/
private AuxiliaryService createAuxService(AuxServiceRecord service,
Configuration conf, boolean fromConfiguration) throws IOException,
ClassNotFoundException {
final String sName = service.getName();
final String className = getClassName(service);
if (className == null || className.isEmpty()) {
throw new YarnRuntimeException("Class name not provided for auxiliary " +
"service " + sName);
}
if (fromConfiguration) {
// aux services from the configuration have an additional configuration
// option specifying a local classpath that will not be localized
final String appLocalClassPath = conf.get(String.format(
YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) {
return createAuxServiceFromLocalClasspath(service, appLocalClassPath,
conf);
}
}
AuxServiceConfiguration serviceConf = service.getConfiguration();
List<Path> destFiles = new ArrayList<>();
if (serviceConf != null) {
List<AuxServiceFile> files = serviceConf.getFiles();
if (files != null) {
for (AuxServiceFile file : files) {
// localize file (if needed) and add it to the list of paths that
// will become the classpath
destFiles.add(maybeDownloadJars(sName, className, file.getSrcFile(),
file.getType(), conf));
}
}
}
if (destFiles.size() > 0) {
// create aux service using a custom localized classpath
LOG.info("The aux service:" + sName
+ " is using the custom classloader with classpath " + destFiles);
return AuxiliaryServiceWithCustomClassLoader.getInstance(conf,
className, StringUtils.join(File.pathSeparatorChar, destFiles),
getSystemClasses(service, className));
} else {
return createAuxServiceFromConfiguration(service);
}
}
/**
* Copies the specified remote file to local NM aux service directory. If the
* same file already exists (as determined by modification time), the file
* will not be copied again.
*
* @param sName service name
* @param className service class name
* @param remoteFile location of the file to download
* @param type type of file (STATIC for a jar or ARCHIVE for a tarball)
* @param conf configuration
* @return path of the downloaded file
* @throws IOException
*/
@VisibleForTesting
protected Path maybeDownloadJars(String sName, String className, String
remoteFile, AuxServiceFile.TypeEnum type, Configuration conf)
throws IOException {
// load AuxiliaryService from remote classpath
FileContext localLFS = getLocalFileContext(conf);
// create NM aux-service dir in NM localdir if it does not exist.
Path nmAuxDir = dirsHandler.getLocalPathForWrite("."
+ Path.SEPARATOR + NM_AUX_SERVICE_DIR);
if (!localLFS.util().exists(nmAuxDir)) {
try {
localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true);
} catch (IOException ex) {
throw new YarnRuntimeException("Fail to create dir:"
+ nmAuxDir.toString(), ex);
}
}
Path src = new Path(remoteFile);
FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf);
FileStatus scFileStatus = remoteLFS.getFileStatus(src);
if (!scFileStatus.getOwner().equals(
this.userUGI.getShortUserName())) {
throw new YarnRuntimeException("The remote jarfile owner:"
+ scFileStatus.getOwner() + " is not the same as the NM user:"
+ this.userUGI.getShortUserName() + ".");
}
if ((scFileStatus.getPermission().toShort() & 0022) != 0) {
throw new YarnRuntimeException("The remote jarfile should not "
+ "be writable by group or others. "
+ "The current Permission is "
+ scFileStatus.getPermission().toShort());
}
Path downloadDest = new Path(nmAuxDir,
className + "_" + scFileStatus.getModificationTime());
// check whether we need to re-download the jar
// from remote directory
Path targetDirPath = new Path(downloadDest,
scFileStatus.getPath().getName());
FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir);
for (FileStatus sub : allSubDirs) {
if (sub.getPath().getName().equals(downloadDest.getName())) {
return targetDirPath;
} else {
if (sub.getPath().getName().contains(className) &&
!sub.getPath().getName().endsWith(DEL_SUFFIX)) {
Path delPath = new Path(sub.getPath().getParent(),
sub.getPath().getName() + DEL_SUFFIX);
localLFS.rename(sub.getPath(), delPath);
LOG.info("delete old aux service jar dir:"
+ delPath.toString());
FileDeletionTask deletionTask = new FileDeletionTask(
this.delService, null, delPath, null);
this.delService.delete(deletionTask);
}
}
}
LocalResourceType srcType;
if (type == AuxServiceFile.TypeEnum.STATIC) {
srcType = LocalResourceType.FILE;
} else if (type == AuxServiceFile.TypeEnum.ARCHIVE) {
srcType = LocalResourceType.ARCHIVE;
} else {
throw new YarnRuntimeException(
"Cannot unpack file of type " + type + " from remote-file-path:" +
src + "for aux-service:" + ".\n");
}
LocalResource scRsrc = LocalResource.newInstance(
URL.fromURI(src.toUri()),
srcType, LocalResourceVisibility.PRIVATE,
scFileStatus.getLen(), scFileStatus.getModificationTime());
FSDownload download = new FSDownload(localLFS, null, conf,
downloadDest, scRsrc, null);
try {
// don't need to convert downloaded path into a dir
// since it's already a jar path.
return download.call();
} catch (Exception ex) {
throw new YarnRuntimeException(
"Exception happend while downloading files "
+ "for aux-service:" + sName + " and remote-file-path:"
+ src + ".\n" + ex.getMessage());
}
}
/**
* If recovery is enabled, creates a recovery directory for the named
* service and sets it on the service.
*
* @param sName name of the service
* @param s auxiliary service
* @throws IOException
*/
private void setStateStoreDir(String sName, AuxiliaryService s) throws
IOException {
if (stateStoreRoot != null) {
Path storePath = new Path(stateStoreRoot, sName);
stateStoreFs.mkdirs(storePath, storeDirPerms);
s.setRecoveryPath(storePath);
}
}
/**
* Removes a service from the service map and stops it, if it exists.
*
* @param sName name of the service
*/
private synchronized void maybeRemoveAuxService(String sName) {
AuxiliaryService s;
s = serviceMap.remove(sName);
serviceRecordMap.remove(sName);
serviceMetaData.remove(sName);
if (s != null) {
LOG.info("Removing aux service " + sName);
stopAuxService(s);
}
}
/**
* Constructs an AuxiliaryService then configures and initializes it based
* on a service specification.
*
* @param service aux service record
* @param conf configuration
* @param fromConfiguration true if from configuration, false if from manifest
* @return aux service
* @throws IOException
*/
private AuxiliaryService initAuxService(AuxServiceRecord service,
Configuration conf, boolean fromConfiguration) throws IOException {
final String sName = service.getName();
AuxiliaryService s;
try {
Preconditions
.checkArgument(
validateAuxServiceName(sName),
"The auxiliary service name: " + sName + " is invalid. " +
"The valid service name should only contain a-zA-Z0-9_ " +
"and cannot start with numbers.");
s = createAuxService(service, conf, fromConfiguration);
if (s == null) {
throw new YarnRuntimeException("No auxiliary service class loaded for" +
" " + sName);
}
// TODO better use s.getName()?
if (!sName.equals(s.getName())) {
LOG.warn("The Auxiliary Service named '" + sName + "' in the "
+ "configuration is for " + s.getClass() + " which has "
+ "a name of '" + s.getName() + "'. Because these are "
+ "not the same tools trying to send ServiceData and read "
+ "Service Meta Data may have issues unless the refer to "
+ "the name in the config.");
}
s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler);
setStateStoreDir(sName, s);
Configuration customConf = new Configuration(conf);
if (service.getConfiguration() != null) {
for (Entry<String, String> entry : service.getConfiguration()
.getProperties().entrySet()) {
customConf.set(entry.getKey(), entry.getValue());
}
}
s.init(customConf);
LOG.info("Initialized auxiliary service " + sName);
} catch (RuntimeException e) {
LOG.error("Failed to initialize " + sName, e);
throw e;
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(e);
}
return s;
}
/**
* Reloads auxiliary services manifest. Must be called after service init.
*
* @throws IOException if manifest can't be loaded
*/
@VisibleForTesting
protected void reloadManifest() throws IOException {
loadManifest(getConfig(), true);
}
/**
* Reloads auxiliary services. Must be called after service init.
*
* @param services a list of auxiliary services
* @throws IOException if aux services have not been started yet or dynamic
* reloading is not enabled
*/
public synchronized void reload(AuxServiceRecords services) throws
IOException {
if (!manifestEnabled) {
throw new IOException("Dynamic reloading is not enabled via " +
YarnConfiguration.NM_AUX_SERVICES_MANIFEST_ENABLED);
}
if (getServiceState() != Service.STATE.STARTED) {
throw new IOException("Auxiliary services have not been started yet, " +
"please retry later");
}
LOG.info("Received list of auxiliary services: " + mapper
.writeValueAsString(services));
loadServices(services, getConfig(), true);
}
private boolean checkManifestPermissions(FileStatus status) throws
IOException {
if ((status.getPermission().toShort() & 0022) != 0) {
LOG.error("Manifest file and parents must not be writable by group or " +
"others. The current Permission of " + status.getPath() + " is " +
status.getPermission());
return false;
}
Path parent = status.getPath().getParent();
if (parent == null) {
return true;
}
return checkManifestPermissions(manifestFS.getFileStatus(parent));
}
private boolean checkManifestOwnerAndPermissions(FileStatus status) throws
IOException {
AccessControlList yarnAdminAcl = new AccessControlList(getConfig().get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
if (!yarnAdminAcl.isUserAllowed(
UserGroupInformation.createRemoteUser(status.getOwner()))) {
LOG.error("Manifest must be owned by YARN admin: " + manifest);
return false;
}
return checkManifestPermissions(status);
}
/**
* Reads the manifest file if it is configured, exists, and has not been
* modified since the last read.
*
* @return aux service records
* @throws IOException
*/
private synchronized AuxServiceRecords maybeReadManifestFile() throws
IOException {
if (manifest == null) {
return null;
}
if (!manifestFS.exists(manifest)) {
LOG.warn("Manifest file " + manifest + " doesn't exist");
return null;
}
FileStatus status;
try {
status = manifestFS.getFileStatus(manifest);
} catch (FileNotFoundException e) {
LOG.warn("Manifest file " + manifest + " doesn't exist");
return null;
}
if (!status.isFile()) {
LOG.warn("Manifest file " + manifest + " is not a file");
}
if (!checkManifestOwnerAndPermissions(status)) {
return null;
}
if (status.getModificationTime() == manifestModifyTS) {
return null;
}
manifestModifyTS = status.getModificationTime();
LOG.info("Reading auxiliary services manifest " + manifest);
try (FSDataInputStream in = manifestFS.open(manifest)) {
return mapper.readValue((InputStream) in, AuxServiceRecords.class);
}
}
/**
* Updates current aux services based on changes found in the manifest.
*
* @param conf configuration
* @param startServices if true starts services, otherwise only inits services
* @throws IOException
*/
@VisibleForTesting
protected synchronized void loadManifest(Configuration conf, boolean
startServices) throws IOException {
if (!manifestEnabled) {
throw new IOException("Dynamic reloading is not enabled via " +
YarnConfiguration.NM_AUX_SERVICES_MANIFEST_ENABLED);
}
if (manifest == null) {
return;
}
if (!manifestFS.exists(manifest)) {
if (serviceMap.isEmpty()) {
return;
}
LOG.info("Manifest file " + manifest + " doesn't exist, stopping " +
"auxiliary services");
Set<String> servicesToRemove = new HashSet<>(serviceMap.keySet());
for (String sName : servicesToRemove) {
maybeRemoveAuxService(sName);
}
return;
}
AuxServiceRecords services = maybeReadManifestFile();
loadServices(services, conf, startServices);
}
/**
* Updates current aux services based on changes found in the service list.
*
* @param services list of auxiliary services
* @param conf configuration
* @param startServices if true starts services, otherwise only inits services
* @throws IOException
*/
private synchronized void loadServices(AuxServiceRecords services,
Configuration conf, boolean startServices) throws IOException {
if (services == null) {
// read did not occur or no changes detected
return;
}
Set<String> loadedAuxServices = new HashSet<>();
boolean foundChanges = false;
if (services.getServices() != null) {
for (AuxServiceRecord service : services.getServices()) {
AuxServiceRecord existingService = serviceRecordMap.get(service
.getName());
loadedAuxServices.add(service.getName());
if (existingService != null && existingService.equals(service)) {
LOG.debug("Auxiliary service already loaded: {}", service.getName());
continue;
}
foundChanges = true;
try {
// stop aux service
maybeRemoveAuxService(service.getName());
// init aux service
AuxiliaryService s = initAuxService(service, conf, false);
if (startServices) {
// start aux service
startAuxService(service.getName(), s, service);
}
// add aux service to serviceMap
addService(service.getName(), s, service);
} catch (IOException e) {
LOG.error("Failed to load auxiliary service " + service.getName());
}
}
}
// remove aux services that do not appear in the new list
Set<String> servicesToRemove = new HashSet<>(serviceMap.keySet());
servicesToRemove.removeAll(loadedAuxServices);
for (String sName : servicesToRemove) {
foundChanges = true;
maybeRemoveAuxService(sName);
}
if (!foundChanges) {
LOG.info("No auxiliary services changes detected");
}
}
private static String getClassName(AuxServiceRecord service) {
AuxServiceConfiguration serviceConf = service.getConfiguration();
if (serviceConf == null) {
return null;
}
return serviceConf.getProperty(CLASS_NAME);
}
private static String[] getSystemClasses(AuxServiceRecord service, String
className) {
AuxServiceConfiguration serviceConf =
service.getConfiguration();
if (serviceConf == null) {
return new String[]{className};
}
return StringUtils.split(serviceConf.getProperty(SYSTEM_CLASSES,
className));
}
/**
* Translates an aux service specified in the Configuration to an aux
* service record.
*
* @param sName aux service name
* @param conf configuration
* @return
*/
private static AuxServiceRecord createServiceRecordFromConfiguration(String
sName, Configuration conf) {
String className = conf.get(String.format(
YarnConfiguration.NM_AUX_SERVICE_FMT, sName));
String remoteClassPath = conf.get(String.format(
YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName));
String[] systemClasses = conf.getTrimmedStrings(String.format(
YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, sName));
AuxServiceConfiguration serviceConf = new AuxServiceConfiguration();
if (className != null) {
serviceConf.setProperty(CLASS_NAME, className);
}
if (systemClasses != null) {
serviceConf.setProperty(SYSTEM_CLASSES, StringUtils.join(",",
systemClasses));
}
if (remoteClassPath != null) {
AuxServiceFile.TypeEnum type;
String lcClassPath = StringUtils.toLowerCase(remoteClassPath);
if (lcClassPath.endsWith(".jar")) {
type = AuxServiceFile.TypeEnum.STATIC;
} else if (lcClassPath.endsWith(".zip") ||
lcClassPath.endsWith(".tar.gz") || lcClassPath.endsWith(".tgz") ||
lcClassPath.endsWith(".tar")) {
type = AuxServiceFile.TypeEnum.ARCHIVE;
} else {
throw new YarnRuntimeException("Cannot unpack file from " +
"remote-file-path:" + remoteClassPath + "for aux-service:" +
sName + ".\n");
}
AuxServiceFile file = new AuxServiceFile().srcFile(remoteClassPath)
.type(type);
serviceConf.getFiles().add(file);
}
return new AuxServiceRecord().name(sName).configuration(serviceConf);
}
@Override
public synchronized void serviceInit(Configuration conf) throws Exception {
boolean recoveryEnabled = conf.getBoolean(
YarnConfiguration.NM_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
if (recoveryEnabled) {
stateStoreRoot = new Path(conf.get(YarnConfiguration.NM_RECOVERY_DIR),
STATE_STORE_ROOT_NAME);
stateStoreFs = FileSystem.getLocal(conf);
}
manifestEnabled = conf.getBoolean(
YarnConfiguration.NM_AUX_SERVICES_MANIFEST_ENABLED,
YarnConfiguration.DEFAULT_NM_AUX_SERVICES_MANIFEST_ENABLED);
if (!manifestEnabled) {
Collection<String> auxNames = conf.getStringCollection(
YarnConfiguration.NM_AUX_SERVICES);
for (final String sName : auxNames) {
AuxServiceRecord service = createServiceRecordFromConfiguration(sName,
conf);
maybeRemoveAuxService(sName);
AuxiliaryService s = initAuxService(service, conf, true);
addService(sName, s, service);
}
} else {
String manifestStr = conf.get(YarnConfiguration.NM_AUX_SERVICES_MANIFEST);
if (manifestStr != null) {
manifest = new Path(manifestStr);
manifestFS = FileSystem.get(new URI(manifestStr), conf);
loadManifest(conf, false);
manifestReloadInterval = conf.getLong(
YarnConfiguration.NM_AUX_SERVICES_MANIFEST_RELOAD_MS,
YarnConfiguration.DEFAULT_NM_AUX_SERVICES_MANIFEST_RELOAD_MS);
manifestReloadTask = new ManifestReloadTask();
} else {
LOG.info("Auxiliary services manifest is enabled, but no manifest " +
"file is specified in the configuration.");
}
}
super.serviceInit(conf);
}
private void startAuxService(String name, AuxiliaryService service,
AuxServiceRecord serviceRecord) {
service.start();
service.registerServiceListener(this);
ByteBuffer meta = service.getMetaData();
if (meta != null) {
serviceMetaData.put(name, meta);
}
serviceRecord.setLaunchTime(new Date());
}
private void stopAuxService(Service service) {
if (service.getServiceState() == Service.STATE.STARTED) {
service.unregisterServiceListener(this);
service.stop();
}
}
@Override
public synchronized void serviceStart() throws Exception {
// TODO fork(?) services running as configured user
// monitor for health, shutdown/restart(?) if any should die
for (Map.Entry<String, AuxiliaryService> entry : serviceMap.entrySet()) {
AuxiliaryService service = entry.getValue();
String name = entry.getKey();
startAuxService(name, service, serviceRecordMap.get(name));
}
if (manifestEnabled && manifest != null && manifestReloadInterval > 0) {
LOG.info("Scheduling reloading auxiliary services manifest file at " +
"interval " + manifestReloadInterval + " ms");
manifestReloadTimer = new Timer("AuxServicesManifestReload-Timer",
true);
manifestReloadTimer.schedule(manifestReloadTask,
manifestReloadInterval, manifestReloadInterval);
}
super.serviceStart();
}
@Override
public synchronized void serviceStop() throws Exception {
try {
for (Service service : serviceMap.values()) {
stopAuxService(service);
}
serviceMap.clear();
serviceRecordMap.clear();
serviceMetaData.clear();
if (manifestFS != null) {
manifestFS.close();
}
if (manifestReloadTimer != null) {
manifestReloadTimer.cancel();
}
} finally {
super.serviceStop();
}
}
@Override
public void stateChanged(Service service) {
// services changing state is expected on reload
LOG.info("Service " + service.getName() + " changed state: " +
service.getServiceState());
}
@Override
public void handle(AuxServicesEvent event) {
LOG.info("Got event " + event.getType() + " for appId "
+ event.getApplicationID());
switch (event.getType()) {
case APPLICATION_INIT:
LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
AuxiliaryService service = null;
try {
service = serviceMap.get(event.getServiceID());
service
.initializeApplication(new ApplicationInitializationContext(event
.getUser(), event.getApplicationID(), event.getServiceData()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(service,
AuxServicesEventType.APPLICATION_INIT, th);
}
break;
case APPLICATION_STOP:
for (AuxiliaryService serv : serviceMap.values()) {
try {
serv.stopApplication(new ApplicationTerminationContext(event
.getApplicationID()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.APPLICATION_STOP, th);
}
}
break;
case CONTAINER_INIT:
for (AuxiliaryService serv : serviceMap.values()) {
try {
serv.initializeContainer(new ContainerInitializationContext(
event.getContainer().getUser(),
event.getContainer().getContainerId(),
event.getContainer().getResource(), event.getContainer()
.getContainerTokenIdentifier().getContainerType()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.CONTAINER_INIT, th);
}
}
break;
case CONTAINER_STOP:
for (AuxiliaryService serv : serviceMap.values()) {
try {
serv.stopContainer(new ContainerTerminationContext(
event.getUser(), event.getContainer().getContainerId(),
event.getContainer().getResource(), event.getContainer()
.getContainerTokenIdentifier().getContainerType()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.CONTAINER_STOP, th);
}
}
break;
default:
throw new RuntimeException("Unknown type: " + event.getType());
}
}
private boolean validateAuxServiceName(String name) {
if (name == null || name.trim().isEmpty()) {
return false;
}
return p.matcher(name).matches();
}
private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service,
AuxServicesEventType eventType, Throwable th) {
LOG.warn((null == service ? "The auxService is null"
: "The auxService name is " + service.getName())
+ " and it got an error at event: " + eventType, th);
}
FileContext getLocalFileContext(Configuration conf) {
try {
return FileContext.getLocalFSFileContext(conf);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to access local fs");
}
}
FileContext getRemoteFileContext(final URI path, Configuration conf) {
try {
return FileContext.getFileContext(path, conf);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to access remote fs");
}
}
private UserGroupInformation getRemoteUgi() {
UserGroupInformation remoteUgi;
try {
remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
String msg = "Cannot obtain the user-name. Got exception: "
+ StringUtils.stringifyException(e);
LOG.warn(msg);
throw new YarnRuntimeException(msg);
}
return remoteUgi;
}
protected static AuxServiceRecord newAuxService(String name, String
className) {
AuxServiceConfiguration serviceConf = new AuxServiceConfiguration();
serviceConf.setProperty(CLASS_NAME, className);
return new AuxServiceRecord().name(name).configuration(serviceConf);
}
protected static void setClasspath(AuxServiceRecord service, String
classpath) {
service.getConfiguration().getFiles().add(new AuxServiceFile()
.srcFile(classpath).type(AuxServiceFile.TypeEnum.STATIC));
}
protected static void setSystemClasses(AuxServiceRecord service, String
systemClasses) {
service.getConfiguration().setProperty(SYSTEM_CLASSES, systemClasses);
}
/**
* Class which is used by the {@link Timer} class to periodically execute the
* manifest reload.
*/
private final class ManifestReloadTask extends TimerTask {
@Override
public void run() {
try {
reloadManifest();
} catch (Throwable t) {
// Prevent uncaught exceptions from killing this thread
LOG.warn("Error while reloading manifest: ", t);
}
}
}
}