| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager.containermanager; |
| |
| import java.nio.ByteBuffer; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.regex.Pattern; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.service.Service; |
| import org.apache.hadoop.service.ServiceStateChangeListener; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; |
| import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; |
| import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; |
| import org.apache.hadoop.yarn.server.api.AuxiliaryService; |
| import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; |
| import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; |
| |
| import com.google.common.base.Preconditions; |
| |
| public class AuxServices extends AbstractService |
| implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> { |
| |
| static final String STATE_STORE_ROOT_NAME = "nm-aux-services"; |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(AuxServices.class); |
| |
| protected final Map<String,AuxiliaryService> serviceMap; |
| protected final Map<String,ByteBuffer> serviceMetaData; |
| private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; |
| |
| private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$"); |
| |
| public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { |
| super(AuxServices.class.getName()); |
| serviceMap = |
| Collections.synchronizedMap(new HashMap<String,AuxiliaryService>()); |
| serviceMetaData = |
| Collections.synchronizedMap(new HashMap<String,ByteBuffer>()); |
| this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; |
| // Obtain services from configuration in init() |
| } |
| |
| protected final synchronized void addService(String name, |
| AuxiliaryService service) { |
| LOG.info("Adding auxiliary service " + |
| service.getName() + ", \"" + name + "\""); |
| serviceMap.put(name, service); |
| } |
| |
| Collection<AuxiliaryService> getServices() { |
| return Collections.unmodifiableCollection(serviceMap.values()); |
| } |
| |
| /** |
| * @return the meta data for all registered services, that have been started. |
| * If a service has not been started no metadata will be available. The key |
| * is the name of the service as defined in the configuration. |
| */ |
| public Map<String, ByteBuffer> getMetaData() { |
| Map<String, ByteBuffer> metaClone = new HashMap<String, ByteBuffer>( |
| serviceMetaData.size()); |
| synchronized (serviceMetaData) { |
| for (Entry<String, ByteBuffer> entry : serviceMetaData.entrySet()) { |
| metaClone.put(entry.getKey(), entry.getValue().duplicate()); |
| } |
| } |
| return metaClone; |
| } |
| |
| @Override |
| public void serviceInit(Configuration conf) throws Exception { |
| final FsPermission storeDirPerms = new FsPermission((short)0700); |
| Path stateStoreRoot = null; |
| FileSystem stateStoreFs = null; |
| boolean recoveryEnabled = conf.getBoolean( |
| YarnConfiguration.NM_RECOVERY_ENABLED, |
| YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED); |
| if (recoveryEnabled) { |
| stateStoreRoot = new Path(conf.get(YarnConfiguration.NM_RECOVERY_DIR), |
| STATE_STORE_ROOT_NAME); |
| stateStoreFs = FileSystem.getLocal(conf); |
| } |
| Collection<String> auxNames = conf.getStringCollection( |
| YarnConfiguration.NM_AUX_SERVICES); |
| for (final String sName : auxNames) { |
| try { |
| Preconditions |
| .checkArgument( |
| validateAuxServiceName(sName), |
| "The ServiceName: " + sName + " set in " + |
| YarnConfiguration.NM_AUX_SERVICES +" is invalid." + |
| "The valid service name should only contain a-zA-Z0-9_ " + |
| "and can not start with numbers"); |
| String classKey = String.format( |
| YarnConfiguration.NM_AUX_SERVICE_FMT, sName); |
| String className = conf.get(classKey); |
| final String appClassPath = conf.get(String.format( |
| YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName)); |
| AuxiliaryService s = null; |
| boolean useCustomerClassLoader = appClassPath != null |
| && !appClassPath.isEmpty() && className != null |
| && !className.isEmpty(); |
| if (useCustomerClassLoader) { |
| s = AuxiliaryServiceWithCustomClassLoader.getInstance( |
| conf, className, appClassPath); |
| LOG.info("The aux service:" + sName |
| + " are using the custom classloader"); |
| } else { |
| Class<? extends AuxiliaryService> sClass = conf.getClass( |
| classKey, null, AuxiliaryService.class); |
| |
| if (sClass == null) { |
| throw new RuntimeException("No class defined for " + sName); |
| } |
| s = ReflectionUtils.newInstance(sClass, conf); |
| } |
| if (s == null) { |
| throw new RuntimeException("No object created for " + sName); |
| } |
| // TODO better use s.getName()? |
| if(!sName.equals(s.getName())) { |
| LOG.warn("The Auxiliary Service named '"+sName+"' in the " |
| +"configuration is for "+s.getClass()+" which has " |
| +"a name of '"+s.getName()+"'. Because these are " |
| +"not the same tools trying to send ServiceData and read " |
| +"Service Meta Data may have issues unless the refer to " |
| +"the name in the config."); |
| } |
| s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); |
| addService(sName, s); |
| if (recoveryEnabled) { |
| Path storePath = new Path(stateStoreRoot, sName); |
| stateStoreFs.mkdirs(storePath, storeDirPerms); |
| s.setRecoveryPath(storePath); |
| } |
| s.init(conf); |
| } catch (RuntimeException e) { |
| LOG.error("Failed to initialize " + sName, e); |
| throw e; |
| } |
| } |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| public void serviceStart() throws Exception { |
| // TODO fork(?) services running as configured user |
| // monitor for health, shutdown/restart(?) if any should die |
| for (Map.Entry<String, AuxiliaryService> entry : serviceMap.entrySet()) { |
| AuxiliaryService service = entry.getValue(); |
| String name = entry.getKey(); |
| service.start(); |
| service.registerServiceListener(this); |
| ByteBuffer meta = service.getMetaData(); |
| if(meta != null) { |
| serviceMetaData.put(name, meta); |
| } |
| } |
| super.serviceStart(); |
| } |
| |
| @Override |
| public void serviceStop() throws Exception { |
| try { |
| synchronized (serviceMap) { |
| for (Service service : serviceMap.values()) { |
| if (service.getServiceState() == Service.STATE.STARTED) { |
| service.unregisterServiceListener(this); |
| service.stop(); |
| } |
| } |
| serviceMap.clear(); |
| serviceMetaData.clear(); |
| } |
| } finally { |
| super.serviceStop(); |
| } |
| } |
| |
| @Override |
| public void stateChanged(Service service) { |
| LOG.error("Service " + service.getName() + " changed state: " + |
| service.getServiceState()); |
| stop(); |
| } |
| |
| @Override |
| public void handle(AuxServicesEvent event) { |
| LOG.info("Got event " + event.getType() + " for appId " |
| + event.getApplicationID()); |
| switch (event.getType()) { |
| case APPLICATION_INIT: |
| LOG.info("Got APPLICATION_INIT for service " + event.getServiceID()); |
| AuxiliaryService service = null; |
| try { |
| service = serviceMap.get(event.getServiceID()); |
| service |
| .initializeApplication(new ApplicationInitializationContext(event |
| .getUser(), event.getApplicationID(), event.getServiceData())); |
| } catch (Throwable th) { |
| logWarningWhenAuxServiceThrowExceptions(service, |
| AuxServicesEventType.APPLICATION_INIT, th); |
| } |
| break; |
| case APPLICATION_STOP: |
| for (AuxiliaryService serv : serviceMap.values()) { |
| try { |
| serv.stopApplication(new ApplicationTerminationContext(event |
| .getApplicationID())); |
| } catch (Throwable th) { |
| logWarningWhenAuxServiceThrowExceptions(serv, |
| AuxServicesEventType.APPLICATION_STOP, th); |
| } |
| } |
| break; |
| case CONTAINER_INIT: |
| for (AuxiliaryService serv : serviceMap.values()) { |
| try { |
| serv.initializeContainer(new ContainerInitializationContext( |
| event.getContainer().getUser(), |
| event.getContainer().getContainerId(), |
| event.getContainer().getResource(), event.getContainer() |
| .getContainerTokenIdentifier().getContainerType())); |
| } catch (Throwable th) { |
| logWarningWhenAuxServiceThrowExceptions(serv, |
| AuxServicesEventType.CONTAINER_INIT, th); |
| } |
| } |
| break; |
| case CONTAINER_STOP: |
| for (AuxiliaryService serv : serviceMap.values()) { |
| try { |
| serv.stopContainer(new ContainerTerminationContext( |
| event.getUser(), event.getContainer().getContainerId(), |
| event.getContainer().getResource(), event.getContainer() |
| .getContainerTokenIdentifier().getContainerType())); |
| } catch (Throwable th) { |
| logWarningWhenAuxServiceThrowExceptions(serv, |
| AuxServicesEventType.CONTAINER_STOP, th); |
| } |
| } |
| break; |
| default: |
| throw new RuntimeException("Unknown type: " + event.getType()); |
| } |
| } |
| |
| private boolean validateAuxServiceName(String name) { |
| if (name == null || name.trim().isEmpty()) { |
| return false; |
| } |
| return p.matcher(name).matches(); |
| } |
| |
| private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service, |
| AuxServicesEventType eventType, Throwable th) { |
| LOG.warn((null == service ? "The auxService is null" |
| : "The auxService name is " + service.getName()) |
| + " and it got an error at event: " + eventType, th); |
| } |
| } |