blob: 89e2b09bad5dacca3478fc33de22c74ed489cf06 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.avro.AvroRuntimeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestContainerTokenSecretManager {
private static Log LOG = LogFactory
.getLog(TestContainerTokenSecretManager.class);
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private static FileContext localFS = null;
private static final File localDir = new File("target",
TestContainerTokenSecretManager.class.getName() + "-localDir")
.getAbsoluteFile();
@BeforeClass
public static void setup() throws AccessControlException,
FileNotFoundException, UnsupportedFileSystemException, IOException {
localFS = FileContext.getLocalFSFileContext();
localFS.delete(new Path(localDir.getAbsolutePath()), true);
localDir.mkdir();
}
@Test
public void test() throws IOException, InterruptedException {
final ApplicationId appID = recordFactory.newRecordInstance(ApplicationId.class);
appID.setClusterTimestamp(1234);
appID.setId(5);
final Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
// Set AM expiry interval to be very long.
conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 100000L);
UserGroupInformation.setConfiguration(conf);
MiniYARNCluster yarnCluster =
new MiniYARNCluster(TestContainerTokenSecretManager.class.getName());
yarnCluster.init(conf);
yarnCluster.start();
ResourceManager resourceManager = yarnCluster.getResourceManager();
final YarnRPC yarnRPC = YarnRPC.create(conf);
// Submit an application
ApplicationSubmissionContext appSubmissionContext =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
appSubmissionContext.setApplicationId(appID);
appSubmissionContext.setMasterCapability(recordFactory
.newRecordInstance(Resource.class));
appSubmissionContext.getMasterCapability().setMemory(1024);
// appSubmissionContext.resources = new HashMap<String, URL>();
appSubmissionContext.setUser("testUser");
// appSubmissionContext.environment = new HashMap<String, String>();
// appSubmissionContext.command = new ArrayList<String>();
appSubmissionContext.addCommand("sleep");
appSubmissionContext.addCommand("100");
// TODO: Use a resource to work around bugs. Today NM doesn't create local
// app-dirs if there are no file to download!!
File file = new File(localDir.getAbsolutePath(), "testFile");
FileWriter tmpFile = new FileWriter(file);
tmpFile.write("testing");
tmpFile.close();
URL testFileURL =
ConverterUtils.getYarnUrlFromPath(FileContext.getFileContext()
.makeQualified(new Path(localDir.getAbsolutePath(), "testFile")));
LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
rsrc.setResource(testFileURL);
rsrc.setSize(file.length());
rsrc.setTimestamp(file.lastModified());
rsrc.setType(LocalResourceType.FILE);
rsrc.setVisibility(LocalResourceVisibility.PRIVATE);
appSubmissionContext.setResourceTodo("testFile", rsrc);
SubmitApplicationRequest submitRequest = recordFactory
.newRecordInstance(SubmitApplicationRequest.class);
submitRequest.setApplicationSubmissionContext(appSubmissionContext);
resourceManager.getClientRMService().submitApplication(submitRequest);
// Wait till container gets allocated for AM
int waitCounter = 0;
RMApp app = resourceManager.getRMContext().getRMApps().get(appID);
RMAppAttempt appAttempt = app == null ? null : app.getCurrentAppAttempt();
RMAppAttemptState state = appAttempt == null ? null : appAttempt
.getAppAttemptState();
while ((app == null || appAttempt == null || state == null
|| !state.equals(RMAppAttemptState.LAUNCHED)) && waitCounter++ != 20) {
LOG.info("Waiting for applicationAttempt to be created.. ");
Thread.sleep(1000);
app = resourceManager.getRMContext().getRMApps().get(appID);
appAttempt = app == null ? null : app.getCurrentAppAttempt();
state = appAttempt == null ? null : appAttempt.getAppAttemptState();
}
Assert.assertNotNull(app);
Assert.assertNotNull(appAttempt);
Assert.assertNotNull(state);
Assert.assertEquals(RMAppAttemptState.LAUNCHED, state);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
// Ask for a container from the RM
String schedulerAddressString =
conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
final InetSocketAddress schedulerAddr =
NetUtils.createSocketAddr(schedulerAddressString);
ApplicationTokenIdentifier appTokenIdentifier =
new ApplicationTokenIdentifier(appID);
ApplicationTokenSecretManager appTokenSecretManager =
new ApplicationTokenSecretManager();
appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
.createSecretKey("Dummy".getBytes())); // TODO: FIX. Be in Sync with
// ResourceManager.java
Token<ApplicationTokenIdentifier> appToken =
new Token<ApplicationTokenIdentifier>(appTokenIdentifier,
appTokenSecretManager);
appToken.setService(new Text(schedulerAddressString));
currentUser.addToken(appToken);
conf.setClass(
YarnConfiguration.YARN_SECURITY_INFO,
SchedulerSecurityInfo.class, SecurityInfo.class);
AMRMProtocol scheduler =
currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
@Override
public AMRMProtocol run() {
return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class,
schedulerAddr, conf);
}
});
// Register the appMaster
RegisterApplicationMasterRequest request =
recordFactory
.newRecordInstance(RegisterApplicationMasterRequest.class);
ApplicationMaster applicationMaster = recordFactory
.newRecordInstance(ApplicationMaster.class);
request.setApplicationAttemptId(resourceManager.getRMContext()
.getRMApps().get(appID).getCurrentAppAttempt().getAppAttemptId());
scheduler.registerApplicationMaster(request);
// Now request a container allocation.
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest rr = recordFactory.newRecordInstance(ResourceRequest.class);
rr.setCapability(recordFactory.newRecordInstance(Resource.class));
rr.getCapability().setMemory(1024);
rr.setHostName("*");
rr.setNumContainers(1);
rr.setPriority(recordFactory.newRecordInstance(Priority.class));
rr.getPriority().setPriority(0);
ask.add(rr);
ArrayList<ContainerId> release = new ArrayList<ContainerId>();
AllocateRequest allocateRequest =
recordFactory.newRecordInstance(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(appAttempt.getAppAttemptId());
allocateRequest.setResponseId(0);
allocateRequest.addAllAsks(ask);
allocateRequest.addAllReleases(release);
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getNewContainerList();
waitCounter = 0;
while ((allocatedContainers == null || allocatedContainers.size() == 0)
&& waitCounter++ != 20) {
LOG.info("Waiting for container to be allocated..");
Thread.sleep(1000);
allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
allocatedContainers =
scheduler.allocate(allocateRequest).getAMResponse()
.getNewContainerList();
}
Assert.assertNotNull("Container is not allocted!", allocatedContainers);
Assert.assertEquals("Didn't get one container!", 1,
allocatedContainers.size());
// Now talk to the NM for launching the container.
final Container allocatedContainer = allocatedContainers.get(0);
ContainerToken containerToken = allocatedContainer.getContainerToken();
Token<ContainerTokenIdentifier> token =
new Token<ContainerTokenIdentifier>(
containerToken.getIdentifier().array(),
containerToken.getPassword().array(), new Text(
containerToken.getKind()), new Text(
containerToken.getService()));
currentUser.addToken(token);
conf.setClass(
YarnConfiguration.YARN_SECURITY_INFO,
ContainerManagerSecurityInfo.class, SecurityInfo.class);
currentUser.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
ContainerManager.class, NetUtils
.createSocketAddr(allocatedContainer.getNodeId().toString()),
conf);
try {
LOG.info("Going to make a getContainerStatus() legal request");
GetContainerStatusRequest request =
recordFactory
.newRecordInstance(GetContainerStatusRequest.class);
ContainerId containerID =
recordFactory.newRecordInstance(ContainerId.class);
containerID.setAppId(appID);
containerID.setId(1);
request.setContainerId(containerID);
client.getContainerStatus(request);
} catch (YarnRemoteException e) {
LOG.info("Error", e);
} catch (AvroRuntimeException e) {
LOG.info("Got the expected exception");
}
return null;
}
});
UserGroupInformation maliceUser =
UserGroupInformation.createRemoteUser(currentUser.getShortUserName());
byte[] identifierBytes = containerToken.getIdentifier().array();
DataInputBuffer di = new DataInputBuffer();
di.reset(identifierBytes, identifierBytes.length);
ContainerTokenIdentifier dummyIdentifier = new ContainerTokenIdentifier();
dummyIdentifier.readFields(di);
Resource modifiedResource = recordFactory.newRecordInstance(Resource.class);
modifiedResource.setMemory(2048);
ContainerTokenIdentifier modifiedIdentifier =
new ContainerTokenIdentifier(dummyIdentifier.getContainerID(),
dummyIdentifier.getNmHostName(), modifiedResource);
// Malice user modifies the resource amount
Token<ContainerTokenIdentifier> modifiedToken =
new Token<ContainerTokenIdentifier>(modifiedIdentifier.getBytes(),
containerToken.getPassword().array(), new Text(
containerToken.getKind()), new Text(
containerToken.getService()));
maliceUser.addToken(modifiedToken);
maliceUser.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
ContainerManager.class, NetUtils
.createSocketAddr(allocatedContainer.getNodeId().toString()),
conf);
ContainerId containerID;
LOG.info("Going to contact NM: ilLegal request");
GetContainerStatusRequest request =
recordFactory
.newRecordInstance(GetContainerStatusRequest.class);
containerID =
recordFactory.newRecordInstance(ContainerId.class);
containerID.setAppId(appID);
containerID.setId(1);
request.setContainerId(containerID);
try {
client.getContainerStatus(request);
fail("Connection initiation with illegally modified "
+ "tokens is expected to fail.");
} catch (YarnRemoteException e) {
LOG.error("Got exception", e);
fail("Cannot get a YARN remote exception as " +
"it will indicate RPC success");
} catch (Exception e) {
Assert.assertEquals(
java.lang.reflect.UndeclaredThrowableException.class
.getCanonicalName(), e.getClass().getCanonicalName());
Assert
.assertEquals(
"DIGEST-MD5: digest response format violation. Mismatched response.",
e.getCause().getCause().getMessage());
}
return null;
}
});
}
}