blob: 3ba4bebc86e837cb3f9cd41a8c497897b1cf52cb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.NMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
static Log LOG = LogFactory.getLog(TestContainerManagerSecurity.class);
static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private static MiniYARNCluster yarnCluster;
private static final File testRootDir = new File("target",
TestContainerManagerSecurity.class.getName() + "-root");
private static File httpSpnegoKeytabFile = new File(testRootDir,
"httpSpnegoKeytabFile.keytab");
private static String httpSpnegoPrincipal = "HTTP/localhost@EXAMPLE.COM";
private Configuration conf;
@Before
public void setUp() throws Exception {
testRootDir.mkdirs();
httpSpnegoKeytabFile.deleteOnExit();
getKdc().createPrincipal(httpSpnegoKeytabFile, httpSpnegoPrincipal);
UserGroupInformation.setConfiguration(conf);
yarnCluster =
new MiniYARNCluster(TestContainerManagerSecurity.class.getName(), 1, 1,
1);
yarnCluster.init(conf);
yarnCluster.start();
}
@After
public void tearDown() {
if (yarnCluster != null) {
yarnCluster.stop();
yarnCluster = null;
}
testRootDir.delete();
}
/*
* Run two tests: one with no security ("simple") and one with "Secure"
* The first parameter is just the test name to make it easier to debug
* and to give details in say an IDE. The second is the configuraiton
* object to use.
*/
@Parameters(name = "{0}")
public static Collection<Object[]> configs() {
Configuration configurationWithoutSecurity = new Configuration();
configurationWithoutSecurity.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple");
Configuration configurationWithSecurity = new Configuration();
configurationWithSecurity.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
configurationWithSecurity.set(
YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal);
configurationWithSecurity.set(
YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,
httpSpnegoKeytabFile.getAbsolutePath());
configurationWithSecurity.set(
YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal);
configurationWithSecurity.set(
YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,
httpSpnegoKeytabFile.getAbsolutePath());
return Arrays.asList(new Object[][] {
{"Simple", configurationWithoutSecurity},
{"Secure", configurationWithSecurity}});
}
public TestContainerManagerSecurity(String name, Configuration conf) {
LOG.info("RUNNING TEST " + name);
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
this.conf = conf;
}
@Test
public void testContainerManager() throws Exception {
// TestNMTokens.
testNMTokens(conf);
// Testing for container token tampering
testContainerToken(conf);
// Testing for container token tampering with epoch
testContainerTokenWithEpoch(conf);
}
/**
* Run a series of tests using different NMTokens. A configuration is
* provided for managing creating of the tokens and rpc.
*/
private void testNMTokens(Configuration testConf) throws Exception {
NMTokenSecretManagerInRM nmTokenSecretManagerRM =
yarnCluster.getResourceManager().getRMContext()
.getNMTokenSecretManager();
NMTokenSecretManagerInNM nmTokenSecretManagerNM =
yarnCluster.getNodeManager(0).getNMContext().getNMTokenSecretManager();
RMContainerTokenSecretManager containerTokenSecretManager =
yarnCluster.getResourceManager().getRMContext().
getContainerTokenSecretManager();
NodeManager nm = yarnCluster.getNodeManager(0);
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM);
// Both id should be equal.
Assert.assertEquals(nmTokenSecretManagerNM.getCurrentKey().getKeyId(),
nmTokenSecretManagerRM.getCurrentKey().getKeyId());
/*
* Below cases should be tested.
* 1) If Invalid NMToken is used then it should be rejected.
* 2) If valid NMToken but belonging to another Node is used then that
* too should be rejected.
* 3) NMToken for say appAttempt-1 is used for starting/stopping/retrieving
* status for container with containerId for say appAttempt-2 should
* be rejected.
* 4) After start container call is successful nmtoken should have been
* saved in NMTokenSecretManagerInNM.
* 5) If start container call was successful (no matter if container is
* still running or not), appAttempt->NMToken should be present in
* NMTokenSecretManagerInNM's cache. Any future getContainerStatus call
* for containerId belonging to that application attempt using
* applicationAttempt's older nmToken should not get any invalid
* nmToken error. (This can be best tested if we roll over NMToken
* master key twice).
*/
YarnRPC rpc = YarnRPC.create(testConf);
String user = "test";
Resource r = Resource.newInstance(1024, 1);
ApplicationId appId = ApplicationId.newInstance(1, 1);
MockRMApp m = new MockRMApp(appId.getId(), appId.getClusterTimestamp(),
RMAppState.NEW);
yarnCluster.getResourceManager().getRMContext().getRMApps().put(appId, m);
ApplicationAttemptId validAppAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId validContainerId =
ContainerId.newContainerId(validAppAttemptId, 0);
NodeId validNode = yarnCluster.getNodeManager(0).getNMContext().getNodeId();
NodeId invalidNode = NodeId.newInstance("InvalidHost", 1234);
org.apache.hadoop.yarn.api.records.Token validNMToken =
nmTokenSecretManagerRM.createNMToken(validAppAttemptId, validNode, user);
org.apache.hadoop.yarn.api.records.Token validContainerToken =
containerTokenSecretManager.createContainerToken(validContainerId,
0, validNode, user, r, Priority.newInstance(10), 1234);
ContainerTokenIdentifier identifier =
BuilderUtils.newContainerTokenIdentifier(validContainerToken);
Assert.assertEquals(Priority.newInstance(10), identifier.getPriority());
Assert.assertEquals(1234, identifier.getCreationTime());
StringBuilder sb;
// testInvalidNMToken ... creating NMToken using different secret manager.
NMTokenSecretManagerInRM tempManager = new NMTokenSecretManagerInRM(testConf);
tempManager.rollMasterKey();
do {
tempManager.rollMasterKey();
tempManager.activateNextMasterKey();
// Making sure key id is different.
} while (tempManager.getCurrentKey().getKeyId() == nmTokenSecretManagerRM
.getCurrentKey().getKeyId());
// Testing that NM rejects the requests when we don't send any token.
if (UserGroupInformation.isSecurityEnabled()) {
sb = new StringBuilder("Client cannot authenticate via:[TOKEN]");
} else {
sb =
new StringBuilder(
"SIMPLE authentication is not enabled. Available:[TOKEN]");
}
String errorMsg = testStartContainer(rpc, validAppAttemptId, validNode,
validContainerToken, null, true);
Assert.assertTrue("In calling " + validNode + " exception was '"
+ errorMsg + "' but doesn't contain '"
+ sb.toString() + "'", errorMsg.contains(sb.toString()));
org.apache.hadoop.yarn.api.records.Token invalidNMToken =
tempManager.createNMToken(validAppAttemptId, validNode, user);
sb = new StringBuilder("Given NMToken for application : ");
sb.append(validAppAttemptId.toString())
.append(" seems to have been generated illegally.");
Assert.assertTrue(sb.toString().contains(
testStartContainer(rpc, validAppAttemptId, validNode,
validContainerToken, invalidNMToken, true)));
// valid NMToken but belonging to other node
invalidNMToken =
nmTokenSecretManagerRM.createNMToken(validAppAttemptId, invalidNode,
user);
sb = new StringBuilder("Given NMToken for application : ");
sb.append(validAppAttemptId)
.append(" is not valid for current node manager.expected : ")
.append(validNode.toString())
.append(" found : ").append(invalidNode.toString());
Assert.assertTrue(sb.toString().contains(
testStartContainer(rpc, validAppAttemptId, validNode,
validContainerToken, invalidNMToken, true)));
// using correct tokens. nmtoken for app attempt should get saved.
testConf.setInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
4 * 60 * 1000);
validContainerToken =
containerTokenSecretManager.createContainerToken(validContainerId,
0, validNode, user, r, Priority.newInstance(0), 0);
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
validContainerToken, validNMToken, false).isEmpty());
Assert.assertTrue(nmTokenSecretManagerNM
.isAppAttemptNMTokenKeyPresent(validAppAttemptId));
// using a new compatible version nmtoken, expect container can be started
// successfully.
ApplicationAttemptId validAppAttemptId2 =
ApplicationAttemptId.newInstance(appId, 2);
ContainerId validContainerId2 =
ContainerId.newContainerId(validAppAttemptId2, 0);
org.apache.hadoop.yarn.api.records.Token validContainerToken2 =
containerTokenSecretManager.createContainerToken(validContainerId2,
0, validNode, user, r, Priority.newInstance(0), 0);
org.apache.hadoop.yarn.api.records.Token validNMToken2 =
nmTokenSecretManagerRM.createNMToken(validAppAttemptId2, validNode, user);
// First, get a new NMTokenIdentifier.
NMTokenIdentifier newIdentifier = new NMTokenIdentifier();
byte[] tokenIdentifierContent = validNMToken2.getIdentifier().array();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
newIdentifier.readFields(dib);
// Then, generate a new version NMTokenIdentifier (NMTokenIdentifierNewForTest)
// with additional field of message.
NMTokenIdentifierNewForTest newVersionIdentifier =
new NMTokenIdentifierNewForTest(newIdentifier, "message");
// check new version NMTokenIdentifier has correct info.
Assert.assertEquals("The ApplicationAttemptId is changed after set to " +
"newVersionIdentifier", validAppAttemptId2.getAttemptId(),
newVersionIdentifier.getApplicationAttemptId().getAttemptId()
);
Assert.assertEquals("The message is changed after set to newVersionIdentifier",
"message", newVersionIdentifier.getMessage());
Assert.assertEquals("The NodeId is changed after set to newVersionIdentifier",
validNode, newVersionIdentifier.getNodeId());
// create new Token based on new version NMTokenIdentifier.
org.apache.hadoop.yarn.api.records.Token newVersionedNMToken =
BaseNMTokenSecretManager.newInstance(
nmTokenSecretManagerRM.retrievePassword(newVersionIdentifier),
newVersionIdentifier);
// Verify startContainer is successful and no exception is thrown.
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId2, validNode,
validContainerToken2, newVersionedNMToken, false).isEmpty());
Assert.assertTrue(nmTokenSecretManagerNM
.isAppAttemptNMTokenKeyPresent(validAppAttemptId2));
//Now lets wait till container finishes and is removed from node manager.
waitForContainerToFinishOnNM(validContainerId);
sb = new StringBuilder("Attempt to relaunch the same container with id ");
sb.append(validContainerId);
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
validContainerToken, validNMToken, true).contains(sb.toString()));
// Container is removed from node manager's memory by this time.
// trying to stop the container. It should not throw any exception.
testStopContainer(rpc, validAppAttemptId, validNode, validContainerId,
validNMToken, false);
// Rolling over master key twice so that we can check whether older keys
// are used for authentication.
rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
// Key rolled over once.. rolling over again
rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
// trying get container status. Now saved nmToken should be used for
// authentication... It should complain saying container was recently
// stopped.
sb = new StringBuilder("Container ");
sb.append(validContainerId);
sb.append(" was recently stopped on node manager");
Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
validContainerId, validNMToken, true).contains(sb.toString()));
// Now lets remove the container from nm-memory
nm.getNodeStatusUpdater().clearFinishedContainersFromCache();
// This should fail as container is removed from recently tracked finished
// containers.
sb = new StringBuilder("Container ");
sb.append(validContainerId.toString());
sb.append(" is not handled by this NodeManager");
Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
validContainerId, validNMToken, false).contains(sb.toString()));
// using appAttempt-1 NMtoken for launching container for appAttempt-2
// should succeed.
ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(appId, 2);
Token attempt1NMToken =
nmTokenSecretManagerRM
.createNMToken(validAppAttemptId, validNode, user);
org.apache.hadoop.yarn.api.records.Token newContainerToken =
containerTokenSecretManager.createContainerToken(
ContainerId.newContainerId(attempt2, 1), 0, validNode, user, r,
Priority.newInstance(0), 0);
Assert.assertTrue(testStartContainer(rpc, attempt2, validNode,
newContainerToken, attempt1NMToken, false).isEmpty());
}
private void waitForContainerToFinishOnNM(ContainerId containerId) {
Context nmContext = yarnCluster.getNodeManager(0).getNMContext();
int interval = 4 * 60; // Max time for container token to expire.
Assert.assertNotNull(nmContext.getContainers().containsKey(containerId));
// Get the container first, as it may be removed from the Context
// by asynchronous calls.
// This was leading to a flakey test as otherwise the container could
// be removed and end up null.
Container waitContainer = nmContext.getContainers().get(containerId);
while ((interval-- > 0)
&& !waitContainer.cloneAndGetContainerStatus()
.getState().equals(ContainerState.COMPLETE)) {
try {
LOG.info("Waiting for " + containerId + " to complete.");
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
// Normally, Containers will be removed from NM context after they are
// explicitly acked by RM. Now, manually remove it for testing.
yarnCluster.getNodeManager(0).getNodeStatusUpdater()
.addCompletedContainer(containerId);
LOG.info("Removing container from NMContext, containerID = " + containerId);
nmContext.getContainers().remove(containerId);
}
protected void waitForNMToReceiveNMTokenKey(
NMTokenSecretManagerInNM nmTokenSecretManagerNM)
throws InterruptedException {
int attempt = 60;
while (nmTokenSecretManagerNM.getNodeId() == null && attempt-- > 0) {
Thread.sleep(2000);
}
}
protected void rollNMTokenMasterKey(
NMTokenSecretManagerInRM nmTokenSecretManagerRM,
NMTokenSecretManagerInNM nmTokenSecretManagerNM) throws Exception {
int oldKeyId = nmTokenSecretManagerRM.getCurrentKey().getKeyId();
nmTokenSecretManagerRM.rollMasterKey();
int interval = 40;
while (nmTokenSecretManagerNM.getCurrentKey().getKeyId() == oldKeyId
&& interval-- > 0) {
Thread.sleep(1000);
}
nmTokenSecretManagerRM.activateNextMasterKey();
Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId()
== nmTokenSecretManagerRM.getCurrentKey().getKeyId()));
}
private String testStopContainer(YarnRPC rpc,
ApplicationAttemptId appAttemptId, NodeId nodeId,
ContainerId containerId, Token nmToken, boolean isExceptionExpected) {
try {
stopContainer(rpc, nmToken,
Arrays.asList(new ContainerId[] {containerId}), appAttemptId,
nodeId);
if (isExceptionExpected) {
fail("Exception was expected!!");
}
return "";
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
}
}
private String testGetContainer(YarnRPC rpc,
ApplicationAttemptId appAttemptId, NodeId nodeId,
ContainerId containerId,
org.apache.hadoop.yarn.api.records.Token nmToken,
boolean isExceptionExpected) {
try {
getContainerStatus(rpc, nmToken, containerId, appAttemptId, nodeId,
isExceptionExpected);
if (isExceptionExpected) {
fail("Exception was expected!!");
}
return "";
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
}
}
private String testStartContainer(YarnRPC rpc,
ApplicationAttemptId appAttemptId, NodeId nodeId,
org.apache.hadoop.yarn.api.records.Token containerToken,
org.apache.hadoop.yarn.api.records.Token nmToken,
boolean isExceptionExpected) {
try {
startContainer(rpc, nmToken, containerToken, nodeId,
appAttemptId.toString());
if (isExceptionExpected){
fail("Exception was expected!!");
}
return "";
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
}
}
private void stopContainer(YarnRPC rpc, Token nmToken,
List<ContainerId> containerId, ApplicationAttemptId appAttemptId,
NodeId nodeId) throws Exception {
StopContainersRequest request =
StopContainersRequest.newInstance(containerId);
ContainerManagementProtocol proxy = null;
try {
proxy =
getContainerManagementProtocolProxy(rpc, nmToken, nodeId,
appAttemptId.toString());
StopContainersResponse response = proxy.stopContainers(request);
if (response.getFailedRequests() != null &&
response.getFailedRequests().containsKey(containerId)) {
parseAndThrowException(response.getFailedRequests().get(containerId)
.deSerialize());
}
} catch (Exception e) {
if (proxy != null) {
rpc.stopProxy(proxy, conf);
}
}
}
private void
getContainerStatus(YarnRPC rpc,
org.apache.hadoop.yarn.api.records.Token nmToken,
ContainerId containerId,
ApplicationAttemptId appAttemptId, NodeId nodeId,
boolean isExceptionExpected) throws Exception {
List<ContainerId> containerIds = new ArrayList<ContainerId>();
containerIds.add(containerId);
GetContainerStatusesRequest request =
GetContainerStatusesRequest.newInstance(containerIds);
ContainerManagementProtocol proxy = null;
try {
proxy =
getContainerManagementProtocolProxy(rpc, nmToken, nodeId,
appAttemptId.toString());
GetContainerStatusesResponse statuses
= proxy.getContainerStatuses(request);
if (statuses.getFailedRequests() != null
&& statuses.getFailedRequests().containsKey(containerId)) {
parseAndThrowException(statuses.getFailedRequests().get(containerId)
.deSerialize());
}
} finally {
if (proxy != null) {
rpc.stopProxy(proxy, conf);
}
}
}
private void startContainer(final YarnRPC rpc,
org.apache.hadoop.yarn.api.records.Token nmToken,
org.apache.hadoop.yarn.api.records.Token containerToken,
NodeId nodeId, String user) throws Exception {
ContainerLaunchContext context =
Records.newRecord(ContainerLaunchContext.class);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(context, containerToken);
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
ContainerManagementProtocol proxy = null;
try {
proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
StartContainersResponse response = proxy.startContainers(allRequests);
for(SerializedException ex : response.getFailedRequests().values()){
parseAndThrowException(ex.deSerialize());
}
} finally {
if (proxy != null) {
rpc.stopProxy(proxy, conf);
}
}
}
private void parseAndThrowException(Throwable t) throws YarnException,
IOException {
if (t instanceof YarnException) {
throw (YarnException) t;
} else if (t instanceof InvalidToken) {
throw (InvalidToken) t;
} else {
throw (IOException) t;
}
}
protected ContainerManagementProtocol getContainerManagementProtocolProxy(
final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken,
NodeId nodeId, String user) {
ContainerManagementProtocol proxy;
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
final InetSocketAddress addr =
new InetSocketAddress(nodeId.getHost(), nodeId.getPort());
if (nmToken != null) {
ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
}
proxy =
NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
rpc, addr);
return proxy;
}
/**
* This tests a malice user getting a proper token but then messing with it by
* tampering with containerID/Resource etc.. His/her containers should be
* rejected.
*
* @throws IOException
* @throws InterruptedException
* @throws YarnException
*/
private void testContainerToken(Configuration conf) throws IOException,
InterruptedException, YarnException {
LOG.info("Running test for malice user");
/*
* We need to check for containerToken (authorization).
* Here we will be assuming that we have valid NMToken
* 1) ContainerToken used is expired.
* 2) ContainerToken is tampered (resource is modified).
*/
NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
yarnCluster.getResourceManager().getRMContext()
.getNMTokenSecretManager();
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
ContainerId cId = ContainerId.newContainerId(appAttemptId, 0);
NodeManager nm = yarnCluster.getNodeManager(0);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
nm.getNMContext().getNMTokenSecretManager();
String user = "test";
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM);
NodeId nodeId = nm.getNMContext().getNodeId();
// Both id should be equal.
Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
RMContainerTokenSecretManager containerTokenSecretManager =
yarnCluster.getResourceManager().getRMContext().
getContainerTokenSecretManager();
Resource r = Resource.newInstance(1230, 2);
Token containerToken =
containerTokenSecretManager.createContainerToken(
cId, 0, nodeId, user, r, Priority.newInstance(0), 0);
ContainerTokenIdentifier containerTokenIdentifier =
getContainerTokenIdentifierFromToken(containerToken);
// Verify new compatible version ContainerTokenIdentifier
// can work successfully.
ContainerTokenIdentifierForTest newVersionTokenIdentifier =
new ContainerTokenIdentifierForTest(containerTokenIdentifier,
"message");
byte[] password =
containerTokenSecretManager.createPassword(newVersionTokenIdentifier);
Token newContainerToken = BuilderUtils.newContainerToken(
nodeId, password, newVersionTokenIdentifier);
Token nmToken =
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
YarnRPC rpc = YarnRPC.create(conf);
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
newContainerToken, nmToken, false).isEmpty());
// Creating a tampered Container Token
RMContainerTokenSecretManager tamperedContainerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
tamperedContainerTokenSecretManager.rollMasterKey();
do {
tamperedContainerTokenSecretManager.rollMasterKey();
tamperedContainerTokenSecretManager.activateNextMasterKey();
} while (containerTokenSecretManager.getCurrentKey().getKeyId()
== tamperedContainerTokenSecretManager.getCurrentKey().getKeyId());
ContainerId cId2 = ContainerId.newContainerId(appAttemptId, 1);
// Creating modified containerToken
Token containerToken2 =
tamperedContainerTokenSecretManager.createContainerToken(cId2, 0,
nodeId, user, r, Priority.newInstance(0), 0);
StringBuilder sb = new StringBuilder("Given Container ");
sb.append(cId2);
sb.append(" seems to have an illegally generated token.");
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
containerToken2, nmToken, true).contains(sb.toString()));
}
private ContainerTokenIdentifier getContainerTokenIdentifierFromToken(
Token containerToken) throws IOException {
ContainerTokenIdentifier containerTokenIdentifier;
containerTokenIdentifier = new ContainerTokenIdentifier();
byte[] tokenIdentifierContent = containerToken.getIdentifier().array();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
containerTokenIdentifier.readFields(dib);
return containerTokenIdentifier;
}
/**
* This tests whether a containerId is serialized/deserialized with epoch.
*
* @throws IOException
* @throws InterruptedException
* @throws YarnException
*/
private void testContainerTokenWithEpoch(Configuration conf)
throws IOException, InterruptedException, YarnException {
LOG.info("Running test for serializing/deserializing containerIds");
NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
yarnCluster.getResourceManager().getRMContext()
.getNMTokenSecretManager();
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
ContainerId cId = ContainerId.newContainerId(appAttemptId, (5L << 40) | 3L);
NodeManager nm = yarnCluster.getNodeManager(0);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
nm.getNMContext().getNMTokenSecretManager();
String user = "test";
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM);
NodeId nodeId = nm.getNMContext().getNodeId();
// Both id should be equal.
Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
// Creating a normal Container Token
RMContainerTokenSecretManager containerTokenSecretManager =
yarnCluster.getResourceManager().getRMContext().
getContainerTokenSecretManager();
Resource r = Resource.newInstance(1230, 2);
Token containerToken =
containerTokenSecretManager.createContainerToken(cId, 0, nodeId, user,
r, Priority.newInstance(0), 0);
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier();
byte[] tokenIdentifierContent = containerToken.getIdentifier().array();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
containerTokenIdentifier.readFields(dib);
Assert.assertEquals(cId, containerTokenIdentifier.getContainerID());
Assert.assertEquals(
cId.toString(), containerTokenIdentifier.getContainerID().toString());
Token nmToken =
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
YarnRPC rpc = YarnRPC.create(conf);
testStartContainer(rpc, appAttemptId, nodeId, containerToken, nmToken,
false);
List<ContainerId> containerIds = new LinkedList<ContainerId>();
containerIds.add(cId);
ContainerManagementProtocol proxy
= getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
GetContainerStatusesResponse res = proxy.getContainerStatuses(
GetContainerStatusesRequest.newInstance(containerIds));
Assert.assertNotNull(res.getContainerStatuses().get(0));
Assert.assertEquals(
cId, res.getContainerStatuses().get(0).getContainerId());
Assert.assertEquals(cId.toString(),
res.getContainerStatuses().get(0).getContainerId().toString());
}
}