blob: e6a885bff5ed2262ef3cf145ad9c4b35e719806f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.api.SCMAdminProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This service handles all SCMAdminProtocol rpc calls from administrators
* to the shared cache manager.
*/
@Private
@Unstable
public class SCMAdminProtocolService extends AbstractService implements
SCMAdminProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(SCMAdminProtocolService.class);
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private Server server;
InetSocketAddress clientBindAddress;
private final CleanerService cleanerService;
private YarnAuthorizationProvider authorizer;
public SCMAdminProtocolService(CleanerService cleanerService) {
super(SCMAdminProtocolService.class.getName());
this.cleanerService = cleanerService;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.clientBindAddress = getBindAddress(conf);
authorizer = YarnAuthorizationProvider.getInstance(conf);
super.serviceInit(conf);
}
InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(YarnConfiguration.SCM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_SCM_ADMIN_PORT);
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(SCMAdminProtocol.class, this,
clientBindAddress,
conf, null, // Secret manager null for now (security not supported)
conf.getInt(YarnConfiguration.SCM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_SCM_ADMIN_CLIENT_THREAD_COUNT));
// TODO: Enable service authorization (see YARN-2774)
this.server.start();
clientBindAddress =
conf.updateConnectAddr(YarnConfiguration.SCM_ADMIN_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
private void checkAcls(String method) throws YarnException {
UserGroupInformation user;
try {
user = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
LOG.warn("Couldn't get current user", ioe);
throw RPCUtil.getRemoteException(ioe);
}
if (!authorizer.isAdmin(user)) {
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
" to call '" + method + "'");
throw RPCUtil.getRemoteException(
new AccessControlException("User " + user.getShortUserName() +
" doesn't have permission" + " to call '" + method + "'"));
}
LOG.info("SCM Admin: " + method + " invoked by user " +
user.getShortUserName());
}
@Override
public RunSharedCacheCleanerTaskResponse runCleanerTask(
RunSharedCacheCleanerTaskRequest request) throws YarnException {
checkAcls("runCleanerTask");
RunSharedCacheCleanerTaskResponse response =
recordFactory.newRecordInstance(RunSharedCacheCleanerTaskResponse.class);
this.cleanerService.runCleanerTask();
// if we are here, then we have submitted the request to the cleaner
// service, ack the request to the admin client
response.setAccepted(true);
return response;
}
}