blob: 3fef5e278b0d0764e65b857271f93080fde82df4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.mapreduce.v2.api.HSAdminProtocol;
import org.apache.hadoop.mapreduce.v2.api.HSAdminRefreshProtocolPB;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.ClientHSPolicyProvider;
import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger;
import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger.AuditConstants;
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService;
import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolServerSideTranslatorPB;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
@Private
public class HSAdminServer extends AbstractService implements HSAdminProtocol {
private static final Log LOG = LogFactory.getLog(HSAdminServer.class);
private AccessControlList adminAcl;
private AggregatedLogDeletionService aggLogDelService = null;
/** The RPC server that listens to requests from clients */
protected RPC.Server clientRpcServer;
protected InetSocketAddress clientRpcAddress;
private static final String HISTORY_ADMIN_SERVER = "HSAdminServer";
private JobHistory jobHistoryService = null;
private UserGroupInformation loginUGI;
public HSAdminServer(AggregatedLogDeletionService aggLogDelService,
JobHistory jobHistoryService) {
super(HSAdminServer.class.getName());
this.aggLogDelService = aggLogDelService;
this.jobHistoryService = jobHistoryService;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
RPC.setProtocolEngine(conf, RefreshUserMappingsProtocolPB.class,
ProtobufRpcEngine.class);
RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = new RefreshUserMappingsProtocolServerSideTranslatorPB(
this);
BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
.newReflectiveBlockingService(refreshUserMappingXlator);
GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = new GetUserMappingsProtocolServerSideTranslatorPB(
this);
BlockingService getUserMappingService = GetUserMappingsProtocolService
.newReflectiveBlockingService(getUserMappingXlator);
HSAdminRefreshProtocolServerSideTranslatorPB refreshHSAdminProtocolXlator = new HSAdminRefreshProtocolServerSideTranslatorPB(
this);
BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService
.newReflectiveBlockingService(refreshHSAdminProtocolXlator);
WritableRpcEngine.ensureInitialized();
clientRpcAddress = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
clientRpcServer = new RPC.Builder(conf)
.setProtocol(RefreshUserMappingsProtocolPB.class)
.setInstance(refreshUserMappingService)
.setBindAddress(clientRpcAddress.getHostName())
.setPort(clientRpcAddress.getPort()).setVerbose(false).build();
addProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService);
addProtocol(conf, HSAdminRefreshProtocolPB.class,
refreshHSAdminProtocolService);
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
clientRpcServer.refreshServiceAcl(conf, new ClientHSPolicyProvider());
}
adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL,
JHAdminConfig.DEFAULT_JHS_ADMIN_ACL));
}
@Override
protected void serviceStart() throws Exception {
if (UserGroupInformation.isSecurityEnabled()) {
loginUGI = UserGroupInformation.getLoginUser();
} else {
loginUGI = UserGroupInformation.getCurrentUser();
}
clientRpcServer.start();
}
@VisibleForTesting
UserGroupInformation getLoginUGI() {
return loginUGI;
}
@VisibleForTesting
void setLoginUGI(UserGroupInformation ugi) {
loginUGI = ugi;
}
@Override
protected void serviceStop() throws Exception {
if (clientRpcServer != null) {
clientRpcServer.stop();
}
}
private void addProtocol(Configuration conf, Class<?> protocol,
BlockingService blockingService) throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
clientRpcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol,
blockingService);
}
private UserGroupInformation checkAcls(String method) throws IOException {
UserGroupInformation user;
try {
user = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
LOG.warn("Couldn't get current user", ioe);
HSAuditLogger.logFailure("UNKNOWN", method, adminAcl.toString(),
HISTORY_ADMIN_SERVER, "Couldn't get current user");
throw ioe;
}
if (!adminAcl.isUserAllowed(user)) {
LOG.warn("User " + user.getShortUserName() + " doesn't have permission"
+ " to call '" + method + "'");
HSAuditLogger.logFailure(user.getShortUserName(), method,
adminAcl.toString(), HISTORY_ADMIN_SERVER,
AuditConstants.UNAUTHORIZED_USER);
throw new AccessControlException("User " + user.getShortUserName()
+ " doesn't have permission" + " to call '" + method + "'");
}
LOG.info("HS Admin: " + method + " invoked by user "
+ user.getShortUserName());
return user;
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
return UserGroupInformation.createRemoteUser(user).getGroupNames();
}
@Override
public void refreshUserToGroupsMappings() throws IOException {
UserGroupInformation user = checkAcls("refreshUserToGroupsMappings");
Groups.getUserToGroupsMappingService().refresh();
HSAuditLogger.logSuccess(user.getShortUserName(),
"refreshUserToGroupsMappings", HISTORY_ADMIN_SERVER);
}
@Override
public void refreshSuperUserGroupsConfiguration() throws IOException {
UserGroupInformation user = checkAcls("refreshSuperUserGroupsConfiguration");
ProxyUsers.refreshSuperUserGroupsConfiguration(createConf());
HSAuditLogger.logSuccess(user.getShortUserName(),
"refreshSuperUserGroupsConfiguration", HISTORY_ADMIN_SERVER);
}
protected Configuration createConf() {
return new Configuration();
}
@Override
public void refreshAdminAcls() throws IOException {
UserGroupInformation user = checkAcls("refreshAdminAcls");
Configuration conf = createConf();
adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL,
JHAdminConfig.DEFAULT_JHS_ADMIN_ACL));
HSAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls",
HISTORY_ADMIN_SERVER);
}
@Override
public void refreshLoadedJobCache() throws IOException {
UserGroupInformation user = checkAcls("refreshLoadedJobCache");
try {
jobHistoryService.refreshLoadedJobCache();
} catch (UnsupportedOperationException e) {
HSAuditLogger.logFailure(user.getShortUserName(),
"refreshLoadedJobCache", adminAcl.toString(), HISTORY_ADMIN_SERVER,
e.getMessage());
throw e;
}
HSAuditLogger.logSuccess(user.getShortUserName(), "refreshLoadedJobCache",
HISTORY_ADMIN_SERVER);
}
@Override
public void refreshLogRetentionSettings() throws IOException {
UserGroupInformation user = checkAcls("refreshLogRetentionSettings");
try {
loginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
aggLogDelService.refreshLogRetentionSettings();
return null;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
HSAuditLogger.logSuccess(user.getShortUserName(),
"refreshLogRetentionSettings", "HSAdminServer");
}
@Override
public void refreshJobRetentionSettings() throws IOException {
UserGroupInformation user = checkAcls("refreshJobRetentionSettings");
try {
loginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
jobHistoryService.refreshJobRetentionSettings();
return null;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
HSAuditLogger.logSuccess(user.getShortUserName(),
"refreshJobRetentionSettings", HISTORY_ADMIN_SERVER);
}
}