blob: 225f8bdfedbc592252c191d91b48f4339648f21c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.service.SystemServiceManager;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
/**
* SystemServiceManager implementation.
* Scan for configure system service path.
*
* The service path structure is as follows:
* SYSTEM_SERVICE_DIR_PATH
* |---- sync
* | |--- user1
* | | |---- service1.yarnfile
* | | |---- service2.yarnfile
* | |--- user2
* | | |---- service1.yarnfile
* | | ....
* | |
* |---- async
* | |--- user3
* | | |---- service1.yarnfile
* | | |---- service2.yarnfile
* | |--- user4
* | | |---- service1.yarnfile
* | | ....
* | |
*
* sync: These services are launched at the time of service start synchronously.
* It is a blocking service start.
* async: These services are launched in separate thread without any delay after
* service start. Non-blocking service start.
*/
public class SystemServiceManagerImpl extends AbstractService
implements SystemServiceManager {
private static final Logger LOG =
LoggerFactory.getLogger(SystemServiceManagerImpl.class);
private static final String YARN_FILE_SUFFIX = ".yarnfile";
private static final String SYNC = "sync";
private static final String ASYNC = "async";
private FileSystem fs;
private Path systemServiceDir;
private AtomicBoolean stopExecutors = new AtomicBoolean(false);
private Map<String, Set<Service>> syncUserServices = new HashMap<>();
private Map<String, Set<Service>> asyncUserServices = new HashMap<>();
private UserGroupInformation loginUGI;
private Thread serviceLaucher;
@VisibleForTesting
private int skipCounter;
@VisibleForTesting
private Map<String, Integer> ignoredUserServices =
new HashMap<>();
public SystemServiceManagerImpl() {
super(SystemServiceManagerImpl.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
String dirPath =
conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY);
if (dirPath != null) {
systemServiceDir = new Path(dirPath);
LOG.info("System Service Directory is configured to {}",
systemServiceDir);
fs = systemServiceDir.getFileSystem(conf);
this.loginUGI = UserGroupInformation.isSecurityEnabled() ?
UserGroupInformation.getLoginUser() :
UserGroupInformation.getCurrentUser();
LOG.info("UserGroupInformation initialized to {}", loginUGI);
}
}
@Override
protected void serviceStart() throws Exception {
scanForUserServices();
launchUserService(syncUserServices);
// Create a thread and submit services in background otherwise it
// block RM switch time.
serviceLaucher = new Thread(createRunnable());
serviceLaucher.setName("System service launcher");
serviceLaucher.start();
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping {}", getName());
stopExecutors.set(true);
if (serviceLaucher != null) {
serviceLaucher.interrupt();
try {
serviceLaucher.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}
}
private Runnable createRunnable() {
return new Runnable() {
@Override
public void run() {
launchUserService(asyncUserServices);
}
};
}
void launchUserService(Map<String, Set<Service>> userServices) {
for (Map.Entry<String, Set<Service>> entry : userServices.entrySet()) {
String user = entry.getKey();
Set<Service> services = entry.getValue();
if (services.isEmpty()) {
continue;
}
ServiceClient serviceClient = null;
try {
UserGroupInformation userUgi = getProxyUser(user);
serviceClient = createServiceClient(userUgi);
for (Service service : services) {
LOG.info("POST: createService = {} user = {}", service, userUgi);
try {
launchServices(userUgi, serviceClient, service);
} catch (IOException | UndeclaredThrowableException e) {
if (e.getCause() != null) {
LOG.warn(e.getCause().getMessage());
} else {
String message =
"Failed to create service " + service.getName() + " : ";
LOG.error(message, e);
}
}
}
} catch (InterruptedException e) {
LOG.warn("System service launcher thread interrupted", e);
break;
} catch (Exception e) {
LOG.error("Error while submitting services for user " + user, e);
} finally {
if (serviceClient != null) {
try {
serviceClient.close();
} catch (IOException e) {
LOG.warn("Error while closing serviceClient for user {}", user);
}
}
}
}
}
private ServiceClient createServiceClient(UserGroupInformation userUgi)
throws IOException, InterruptedException {
ServiceClient serviceClient =
userUgi.doAs(new PrivilegedExceptionAction<ServiceClient>() {
@Override public ServiceClient run()
throws IOException, YarnException {
ServiceClient sc = getServiceClient();
sc.init(getConfig());
sc.start();
return sc;
}
});
return serviceClient;
}
private void launchServices(UserGroupInformation userUgi,
ServiceClient serviceClient, Service service)
throws IOException, InterruptedException {
if (service.getState() == ServiceState.STOPPED) {
userUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override public Void run() throws IOException, YarnException {
serviceClient.actionBuild(service);
return null;
}
});
LOG.info("Service {} version {} saved.", service.getName(),
service.getVersion());
} else {
ApplicationId applicationId =
userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() {
@Override public ApplicationId run()
throws IOException, YarnException {
ApplicationId applicationId = serviceClient.actionCreate(service);
return applicationId;
}
});
LOG.info("Service {} submitted with Application ID: {}",
service.getName(), applicationId);
}
}
ServiceClient getServiceClient() {
return new ServiceClient();
}
private UserGroupInformation getProxyUser(String user) {
UserGroupInformation ugi;
if (UserGroupInformation.isSecurityEnabled()) {
ugi = UserGroupInformation.createProxyUser(user, loginUGI);
} else {
ugi = UserGroupInformation.createRemoteUser(user);
}
return ugi;
}
// scan for both launch service types i.e sync and async
void scanForUserServices() throws IOException {
if (systemServiceDir == null) {
return;
}
try {
LOG.info("Scan for launch type on {}", systemServiceDir);
RemoteIterator<FileStatus> iterLaunchType = list(systemServiceDir);
while (iterLaunchType.hasNext()) {
FileStatus launchType = iterLaunchType.next();
if (!launchType.isDirectory()) {
LOG.debug("Scanner skips for unknown file {}", launchType.getPath());
continue;
}
if (launchType.getPath().getName().equals(SYNC)) {
scanForUserServiceDefinition(launchType.getPath(), syncUserServices);
} else if (launchType.getPath().getName().equals(ASYNC)) {
scanForUserServiceDefinition(launchType.getPath(), asyncUserServices);
} else {
LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath());
}
}
} catch (FileNotFoundException e) {
LOG.warn("System service directory {} doesn't not exist.",
systemServiceDir);
}
}
// Files are under systemServiceDir/<users>. Scan for 2 levels
// 1st level for users
// 2nd level for service definitions under user
private void scanForUserServiceDefinition(Path userDirPath,
Map<String, Set<Service>> userServices) throws IOException {
LOG.info("Scan for users on {}", userDirPath);
RemoteIterator<FileStatus> iterUsers = list(userDirPath);
while (iterUsers.hasNext()) {
FileStatus userDir = iterUsers.next();
// if 1st level is not user directory then skip it.
if (!userDir.isDirectory()) {
LOG.info(
"Service definition {} doesn't belong to any user. Ignoring.. ",
userDir.getPath().getName());
continue;
}
String userName = userDir.getPath().getName();
LOG.info("Scanning service definitions for user {}.", userName);
//2nd level scan
RemoteIterator<FileStatus> iterServices = list(userDir.getPath());
while (iterServices.hasNext()) {
FileStatus serviceCache = iterServices.next();
String filename = serviceCache.getPath().getName();
if (!serviceCache.isFile()) {
LOG.info("Scanner skips for unknown dir {}", filename);
continue;
}
if (!filename.endsWith(YARN_FILE_SUFFIX)) {
LOG.info("Scanner skips for unknown file extension, filename = {}",
filename);
skipCounter++;
continue;
}
Service service = getServiceDefinition(serviceCache.getPath());
if (service != null) {
Set<Service> services = userServices.get(userName);
if (services == null) {
services = new HashSet<>();
userServices.put(userName, services);
}
if (!services.add(service)) {
int count = ignoredUserServices.containsKey(userName) ?
ignoredUserServices.get(userName) : 0;
ignoredUserServices.put(userName, count + 1);
LOG.warn(
"Ignoring service {} for the user {} as it is already present,"
+ " filename = {}", service.getName(), userName, filename);
}
LOG.info("Added service {} for the user {}, filename = {}",
service.getName(), userName, filename);
}
}
}
}
private Service getServiceDefinition(Path filePath) {
Service service = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading service definition from FS: " + filePath);
}
service = jsonSerDeser.load(fs, filePath);
} catch (IOException e) {
LOG.info("Error while loading service definition from FS: {}", e);
}
return service;
}
private RemoteIterator<FileStatus> list(Path path) throws IOException {
return new StoppableRemoteIterator(fs.listStatusIterator(path));
}
@VisibleForTesting Map<String, Integer> getIgnoredUserServices() {
return ignoredUserServices;
}
private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
private final RemoteIterator<FileStatus> remote;
StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
this.remote = remote;
}
@Override public boolean hasNext() throws IOException {
return !stopExecutors.get() && remote.hasNext();
}
@Override public FileStatus next() throws IOException {
return remote.next();
}
}
@VisibleForTesting
Map<String, Set<Service>> getSyncUserServices() {
return syncUserServices;
}
@VisibleForTesting int getSkipCounter() {
return skipCounter;
}
}