blob: 9e58d460d17835ac17e3c6783386997f74e4ff9e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.security;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService;
import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestJHSSecurity {
private static final Logger LOG =
LoggerFactory.getLogger(TestJHSSecurity.class);
@Test
public void testDelegationToken() throws Exception {
org.apache.log4j.Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
final YarnConfiguration conf = new YarnConfiguration(new JobConf());
// Just a random principle
conf.set(JHAdminConfig.MR_HISTORY_PRINCIPAL,
"RandomOrc/localhost@apache.org");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
final long initialInterval = 10000l;
final long maxLifetime= 20000l;
final long renewInterval = 10000l;
JobHistoryServer jobHistoryServer = null;
MRClientProtocol clientUsingDT;
long tokenFetchTime;
try {
jobHistoryServer = new JobHistoryServer() {
protected void doSecureLogin(Configuration conf) throws IOException {
// no keytab based login
};
@Override
protected JHSDelegationTokenSecretManager createJHSSecretManager(
Configuration conf, HistoryServerStateStoreService store) {
return new JHSDelegationTokenSecretManager(initialInterval,
maxLifetime, renewInterval, 3600000, store);
}
};
jobHistoryServer.init(conf);
jobHistoryServer.start();
final MRClientProtocol hsService = jobHistoryServer.getClientService()
.getClientHandler();
// Fake the authentication-method
UserGroupInformation loggedInUser = UserGroupInformation
.createRemoteUser("testrenewer@APACHE.ORG");
Assert.assertEquals("testrenewer", loggedInUser.getShortUserName());
// Default realm is APACHE.ORG
loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
Token token = getDelegationToken(loggedInUser, hsService,
loggedInUser.getShortUserName());
tokenFetchTime = System.currentTimeMillis();
LOG.info("Got delegation token at: " + tokenFetchTime);
// Now try talking to JHS using the delegation token
clientUsingDT = getMRClientProtocol(token, jobHistoryServer
.getClientService().getBindAddress(), "TheDarkLord", conf);
GetJobReportRequest jobReportRequest =
Records.newRecord(GetJobReportRequest.class);
jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1));
try {
clientUsingDT.getJobReport(jobReportRequest);
} catch (IOException e) {
Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
}
// Renew after 50% of token age.
while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) {
Thread.sleep(500l);
}
long nextExpTime = renewDelegationToken(loggedInUser, hsService, token);
long renewalTime = System.currentTimeMillis();
LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: "
+ nextExpTime);
// Wait for first expiry, but before renewed expiry.
while (System.currentTimeMillis() > tokenFetchTime + initialInterval
&& System.currentTimeMillis() < nextExpTime) {
Thread.sleep(500l);
}
Thread.sleep(50l);
// Valid token because of renewal.
try {
clientUsingDT.getJobReport(jobReportRequest);
} catch (IOException e) {
Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
}
// Wait for expiry.
while(System.currentTimeMillis() < renewalTime + renewInterval) {
Thread.sleep(500l);
}
Thread.sleep(50l);
LOG.info("At time: " + System.currentTimeMillis() + ", token should be invalid");
// Token should have expired.
final MRClientProtocol finalClientUsingDT = clientUsingDT;
LambdaTestUtils.intercept(SecretManager.InvalidToken.class, "has expired",
() -> finalClientUsingDT.getJobReport(jobReportRequest));
// Test cancellation
// Stop the existing proxy, start another.
if (clientUsingDT != null) {
// RPC.stopProxy(clientUsingDT);
clientUsingDT = null;
}
token = getDelegationToken(loggedInUser, hsService,
loggedInUser.getShortUserName());
tokenFetchTime = System.currentTimeMillis();
LOG.info("Got delegation token at: " + tokenFetchTime);
// Now try talking to HSService using the delegation token
clientUsingDT = getMRClientProtocol(token, jobHistoryServer
.getClientService().getBindAddress(), "loginuser2", conf);
try {
clientUsingDT.getJobReport(jobReportRequest);
} catch (IOException e) {
fail("Unexpected exception" + e);
}
cancelDelegationToken(loggedInUser, hsService, token);
// Testing the token with different renewer to cancel the token
Token tokenWithDifferentRenewer = getDelegationToken(loggedInUser,
hsService, "yarn");
cancelDelegationToken(loggedInUser, hsService, tokenWithDifferentRenewer);
if (clientUsingDT != null) {
// RPC.stopProxy(clientUsingDT);
clientUsingDT = null;
}
// Creating a new connection.
clientUsingDT = getMRClientProtocol(token, jobHistoryServer
.getClientService().getBindAddress(), "loginuser2", conf);
LOG.info("Cancelled delegation token at: " + System.currentTimeMillis());
// Verify cancellation worked.
try {
clientUsingDT.getJobReport(jobReportRequest);
fail("Should not have succeeded with a cancelled delegation token");
} catch (IOException e) {
}
} finally {
jobHistoryServer.stop();
}
}
private Token getDelegationToken(
final UserGroupInformation loggedInUser,
final MRClientProtocol hsService, final String renewerString)
throws IOException, InterruptedException {
// Get the delegation token directly as it is a little difficult to setup
// the kerberos based rpc.
Token token = loggedInUser
.doAs(new PrivilegedExceptionAction<Token>() {
@Override
public Token run() throws IOException {
GetDelegationTokenRequest request = Records
.newRecord(GetDelegationTokenRequest.class);
request.setRenewer(renewerString);
return hsService.getDelegationToken(request).getDelegationToken();
}
});
return token;
}
private long renewDelegationToken(final UserGroupInformation loggedInUser,
final MRClientProtocol hsService, final Token dToken)
throws IOException, InterruptedException {
long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws IOException {
RenewDelegationTokenRequest request = Records
.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(dToken);
return hsService.renewDelegationToken(request).getNextExpirationTime();
}
});
return nextExpTime;
}
private void cancelDelegationToken(final UserGroupInformation loggedInUser,
final MRClientProtocol hsService, final Token dToken)
throws IOException, InterruptedException {
loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
CancelDelegationTokenRequest request = Records
.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(dToken);
hsService.cancelDelegationToken(request);
return null;
}
});
}
private MRClientProtocol getMRClientProtocol(Token token,
final InetSocketAddress hsAddress, String user, final Configuration conf) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.addToken(ConverterUtils.convertFromYarn(token, hsAddress));
final YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol hsWithDT = ugi
.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
hsAddress, conf);
}
});
return hsWithDT;
}
}