| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.service.client; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.service.SystemServiceManager; |
| import org.apache.hadoop.yarn.service.api.records.Service; |
| import org.apache.hadoop.yarn.service.api.records.ServiceState; |
| import org.apache.hadoop.yarn.service.conf.YarnServiceConf; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.lang.reflect.UndeclaredThrowableException; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; |
| |
| /** |
| * SystemServiceManager implementation. |
| * Scan for configure system service path. |
| * |
| * The service path structure is as follows: |
| * SYSTEM_SERVICE_DIR_PATH |
| * |---- sync |
| * | |--- user1 |
| * | | |---- service1.yarnfile |
| * | | |---- service2.yarnfile |
| * | |--- user2 |
| * | | |---- service1.yarnfile |
| * | | .... |
| * | | |
| * |---- async |
| * | |--- user3 |
| * | | |---- service1.yarnfile |
| * | | |---- service2.yarnfile |
| * | |--- user4 |
| * | | |---- service1.yarnfile |
| * | | .... |
| * | | |
| * |
| * sync: These services are launched at the time of service start synchronously. |
| * It is a blocking service start. |
| * async: These services are launched in separate thread without any delay after |
| * service start. Non-blocking service start. |
| */ |
| public class SystemServiceManagerImpl extends AbstractService |
| implements SystemServiceManager { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(SystemServiceManagerImpl.class); |
| |
| private static final String YARN_FILE_SUFFIX = ".yarnfile"; |
| private static final String SYNC = "sync"; |
| private static final String ASYNC = "async"; |
| |
| private FileSystem fs; |
| private Path systemServiceDir; |
| private AtomicBoolean stopExecutors = new AtomicBoolean(false); |
| private Map<String, Set<Service>> syncUserServices = new HashMap<>(); |
| private Map<String, Set<Service>> asyncUserServices = new HashMap<>(); |
| private UserGroupInformation loginUGI; |
| private Thread serviceLaucher; |
| |
| @VisibleForTesting |
| private int skipCounter; |
| @VisibleForTesting |
| private Map<String, Integer> ignoredUserServices = |
| new HashMap<>(); |
| |
| public SystemServiceManagerImpl() { |
| super(SystemServiceManagerImpl.class.getName()); |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| String dirPath = |
| conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY); |
| if (dirPath != null) { |
| systemServiceDir = new Path(dirPath); |
| LOG.info("System Service Directory is configured to {}", |
| systemServiceDir); |
| fs = systemServiceDir.getFileSystem(conf); |
| this.loginUGI = UserGroupInformation.isSecurityEnabled() ? |
| UserGroupInformation.getLoginUser() : |
| UserGroupInformation.getCurrentUser(); |
| LOG.info("UserGroupInformation initialized to {}", loginUGI); |
| } |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| scanForUserServices(); |
| launchUserService(syncUserServices); |
| // Create a thread and submit services in background otherwise it |
| // block RM switch time. |
| serviceLaucher = new Thread(createRunnable()); |
| serviceLaucher.setName("System service launcher"); |
| serviceLaucher.start(); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| LOG.info("Stopping {}", getName()); |
| stopExecutors.set(true); |
| |
| if (serviceLaucher != null) { |
| serviceLaucher.interrupt(); |
| try { |
| serviceLaucher.join(); |
| } catch (InterruptedException ie) { |
| LOG.warn("Interrupted Exception while stopping", ie); |
| } |
| } |
| } |
| |
| private Runnable createRunnable() { |
| return new Runnable() { |
| @Override |
| public void run() { |
| launchUserService(asyncUserServices); |
| } |
| }; |
| } |
| |
| void launchUserService(Map<String, Set<Service>> userServices) { |
| for (Map.Entry<String, Set<Service>> entry : userServices.entrySet()) { |
| String user = entry.getKey(); |
| Set<Service> services = entry.getValue(); |
| if (services.isEmpty()) { |
| continue; |
| } |
| ServiceClient serviceClient = null; |
| try { |
| UserGroupInformation userUgi = getProxyUser(user); |
| serviceClient = createServiceClient(userUgi); |
| for (Service service : services) { |
| LOG.info("POST: createService = {} user = {}", service, userUgi); |
| try { |
| launchServices(userUgi, serviceClient, service); |
| } catch (IOException | UndeclaredThrowableException e) { |
| if (e.getCause() != null) { |
| LOG.warn(e.getCause().getMessage()); |
| } else { |
| String message = |
| "Failed to create service " + service.getName() + " : "; |
| LOG.error(message, e); |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("System service launcher thread interrupted", e); |
| break; |
| } catch (Exception e) { |
| LOG.error("Error while submitting services for user " + user, e); |
| } finally { |
| if (serviceClient != null) { |
| try { |
| serviceClient.close(); |
| } catch (IOException e) { |
| LOG.warn("Error while closing serviceClient for user {}", user); |
| } |
| } |
| } |
| } |
| } |
| |
| private ServiceClient createServiceClient(UserGroupInformation userUgi) |
| throws IOException, InterruptedException { |
| ServiceClient serviceClient = |
| userUgi.doAs(new PrivilegedExceptionAction<ServiceClient>() { |
| @Override public ServiceClient run() |
| throws IOException, YarnException { |
| ServiceClient sc = getServiceClient(); |
| sc.init(getConfig()); |
| sc.start(); |
| return sc; |
| } |
| }); |
| return serviceClient; |
| } |
| |
| private void launchServices(UserGroupInformation userUgi, |
| ServiceClient serviceClient, Service service) |
| throws IOException, InterruptedException { |
| if (service.getState() == ServiceState.STOPPED) { |
| userUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override public Void run() throws IOException, YarnException { |
| serviceClient.actionBuild(service); |
| return null; |
| } |
| }); |
| LOG.info("Service {} version {} saved.", service.getName(), |
| service.getVersion()); |
| } else { |
| ApplicationId applicationId = |
| userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() { |
| @Override public ApplicationId run() |
| throws IOException, YarnException { |
| ApplicationId applicationId = serviceClient.actionCreate(service); |
| return applicationId; |
| } |
| }); |
| LOG.info("Service {} submitted with Application ID: {}", |
| service.getName(), applicationId); |
| } |
| } |
| |
| ServiceClient getServiceClient() { |
| return new ServiceClient(); |
| } |
| |
| private UserGroupInformation getProxyUser(String user) { |
| UserGroupInformation ugi; |
| if (UserGroupInformation.isSecurityEnabled()) { |
| ugi = UserGroupInformation.createProxyUser(user, loginUGI); |
| } else { |
| ugi = UserGroupInformation.createRemoteUser(user); |
| } |
| return ugi; |
| } |
| |
| // scan for both launch service types i.e sync and async |
| void scanForUserServices() throws IOException { |
| if (systemServiceDir == null) { |
| return; |
| } |
| try { |
| LOG.info("Scan for launch type on {}", systemServiceDir); |
| RemoteIterator<FileStatus> iterLaunchType = list(systemServiceDir); |
| while (iterLaunchType.hasNext()) { |
| FileStatus launchType = iterLaunchType.next(); |
| if (!launchType.isDirectory()) { |
| LOG.debug("Scanner skips for unknown file {}", launchType.getPath()); |
| continue; |
| } |
| if (launchType.getPath().getName().equals(SYNC)) { |
| scanForUserServiceDefinition(launchType.getPath(), syncUserServices); |
| } else if (launchType.getPath().getName().equals(ASYNC)) { |
| scanForUserServiceDefinition(launchType.getPath(), asyncUserServices); |
| } else { |
| LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath()); |
| } |
| } |
| } catch (FileNotFoundException e) { |
| LOG.warn("System service directory {} doesn't not exist.", |
| systemServiceDir); |
| } |
| } |
| |
| // Files are under systemServiceDir/<users>. Scan for 2 levels |
| // 1st level for users |
| // 2nd level for service definitions under user |
| private void scanForUserServiceDefinition(Path userDirPath, |
| Map<String, Set<Service>> userServices) throws IOException { |
| LOG.info("Scan for users on {}", userDirPath); |
| RemoteIterator<FileStatus> iterUsers = list(userDirPath); |
| while (iterUsers.hasNext()) { |
| FileStatus userDir = iterUsers.next(); |
| // if 1st level is not user directory then skip it. |
| if (!userDir.isDirectory()) { |
| LOG.info( |
| "Service definition {} doesn't belong to any user. Ignoring.. ", |
| userDir.getPath().getName()); |
| continue; |
| } |
| String userName = userDir.getPath().getName(); |
| LOG.info("Scanning service definitions for user {}.", userName); |
| |
| //2nd level scan |
| RemoteIterator<FileStatus> iterServices = list(userDir.getPath()); |
| while (iterServices.hasNext()) { |
| FileStatus serviceCache = iterServices.next(); |
| String filename = serviceCache.getPath().getName(); |
| if (!serviceCache.isFile()) { |
| LOG.info("Scanner skips for unknown dir {}", filename); |
| continue; |
| } |
| if (!filename.endsWith(YARN_FILE_SUFFIX)) { |
| LOG.info("Scanner skips for unknown file extension, filename = {}", |
| filename); |
| skipCounter++; |
| continue; |
| } |
| Service service = getServiceDefinition(serviceCache.getPath()); |
| if (service != null) { |
| Set<Service> services = userServices.get(userName); |
| if (services == null) { |
| services = new HashSet<>(); |
| userServices.put(userName, services); |
| } |
| if (!services.add(service)) { |
| int count = ignoredUserServices.containsKey(userName) ? |
| ignoredUserServices.get(userName) : 0; |
| ignoredUserServices.put(userName, count + 1); |
| LOG.warn( |
| "Ignoring service {} for the user {} as it is already present," |
| + " filename = {}", service.getName(), userName, filename); |
| } |
| LOG.info("Added service {} for the user {}, filename = {}", |
| service.getName(), userName, filename); |
| } |
| } |
| } |
| } |
| |
| private Service getServiceDefinition(Path filePath) { |
| Service service = null; |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Loading service definition from FS: " + filePath); |
| } |
| service = jsonSerDeser.load(fs, filePath); |
| } catch (IOException e) { |
| LOG.info("Error while loading service definition from FS: {}", e); |
| } |
| return service; |
| } |
| |
| private RemoteIterator<FileStatus> list(Path path) throws IOException { |
| return new StoppableRemoteIterator(fs.listStatusIterator(path)); |
| } |
| |
| @VisibleForTesting Map<String, Integer> getIgnoredUserServices() { |
| return ignoredUserServices; |
| } |
| |
| private class StoppableRemoteIterator implements RemoteIterator<FileStatus> { |
| private final RemoteIterator<FileStatus> remote; |
| |
| StoppableRemoteIterator(RemoteIterator<FileStatus> remote) { |
| this.remote = remote; |
| } |
| |
| @Override public boolean hasNext() throws IOException { |
| return !stopExecutors.get() && remote.hasNext(); |
| } |
| |
| @Override public FileStatus next() throws IOException { |
| return remote.next(); |
| } |
| } |
| |
| @VisibleForTesting |
| Map<String, Set<Service>> getSyncUserServices() { |
| return syncUserServices; |
| } |
| |
| @VisibleForTesting int getSkipCounter() { |
| return skipCounter; |
| } |
| } |