blob: e8f142595ad1f8c87f84df5e58abad73ac2a8978 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public abstract class QueueACLsTestBase {
protected static final String COMMON_USER = "common_user";
protected static final String QUEUE_A_USER = "queueA_user";
protected static final String QUEUE_B_USER = "queueB_user";
protected static final String ROOT_ADMIN = "root_admin";
protected static final String QUEUE_A_ADMIN = "queueA_admin";
protected static final String QUEUE_B_ADMIN = "queueB_admin";
protected static final String QUEUEA = "queueA";
protected static final String QUEUEB = "queueB";
private static final Log LOG = LogFactory.getLog(TestApplicationACLs.class);
MockRM resourceManager;
Configuration conf;
YarnRPC rpc;
InetSocketAddress rmAddress;
@Before
public void setup() throws InterruptedException, IOException {
conf = createConfiguration();
rpc = YarnRPC.create(conf);
rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
AccessControlList adminACL = new AccessControlList("");
conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString());
resourceManager = new MockRM(conf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), this.scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager, getRMContext().getRMDelegationTokenSecretManager());
};
@Override
protected void doSecureLogin() throws IOException {
}
};
new Thread() {
public void run() {
resourceManager.start();
};
}.start();
int waitCount = 0;
while (resourceManager.getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1500);
}
if (resourceManager.getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException("ResourceManager failed to start. Final state is "
+ resourceManager.getServiceState());
}
}
@After
public void tearDown() {
if (resourceManager != null) {
resourceManager.stop();
}
}
@Test
public void testApplicationACLs() throws Exception {
verifyKillAppSuccess(QUEUE_A_USER, QUEUE_A_USER, QUEUEA, true);
verifyKillAppSuccess(QUEUE_A_USER, QUEUE_A_ADMIN, QUEUEA, true);
verifyKillAppSuccess(QUEUE_A_USER, COMMON_USER, QUEUEA, true);
verifyKillAppSuccess(QUEUE_A_USER, ROOT_ADMIN, QUEUEA, true);
verifyKillAppFailure(QUEUE_A_USER, QUEUE_B_USER, QUEUEA, true);
verifyKillAppFailure(QUEUE_A_USER, QUEUE_B_ADMIN, QUEUEA, true);
verifyKillAppSuccess(QUEUE_B_USER, QUEUE_B_USER, QUEUEB, true);
verifyKillAppSuccess(QUEUE_B_USER, QUEUE_B_ADMIN, QUEUEB, true);
verifyKillAppSuccess(QUEUE_B_USER, COMMON_USER, QUEUEB, true);
verifyKillAppSuccess(QUEUE_B_USER, ROOT_ADMIN, QUEUEB, true);
verifyKillAppFailure(QUEUE_B_USER, QUEUE_A_USER, QUEUEB, true);
verifyKillAppFailure(QUEUE_B_USER, QUEUE_A_ADMIN, QUEUEB, true);
verifyKillAppSuccess(ROOT_ADMIN, ROOT_ADMIN, QUEUEA, false);
verifyKillAppSuccess(ROOT_ADMIN, ROOT_ADMIN, QUEUEB, false);
verifyGetClientAMToken(QUEUE_A_USER, ROOT_ADMIN, QUEUEA, true);
}
private void verifyGetClientAMToken(String submitter, String queueAdmin,
String queueName, boolean setupACLs) throws Exception {
ApplicationId applicationId =
submitAppAndGetAppId(submitter, queueName, setupACLs);
final GetApplicationReportRequest appReportRequest =
GetApplicationReportRequest.newInstance(applicationId);
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
ApplicationClientProtocol adMinUserClient = getRMClientForUser(queueAdmin);
GetApplicationReportResponse submitterGetReport =
submitterClient.getApplicationReport(appReportRequest);
GetApplicationReportResponse adMinUserGetReport =
adMinUserClient.getApplicationReport(appReportRequest);
Assert.assertEquals(submitterGetReport.getApplicationReport()
.getClientToAMToken(), adMinUserGetReport.getApplicationReport()
.getClientToAMToken());
}
private void verifyKillAppFailure(String submitter, String killer,
String queueName, boolean setupACLs) throws Exception {
ApplicationId applicationId =
submitAppAndGetAppId(submitter, queueName, setupACLs);
final KillApplicationRequest finishAppRequest =
KillApplicationRequest.newInstance(applicationId);
ApplicationClientProtocol killerClient = getRMClientForUser(killer);
// Kill app as the killer
try {
killerClient.forceKillApplication(finishAppRequest);
Assert.fail("App killing by the enemy should fail!!");
} catch (YarnException e) {
LOG.info("Got exception while killing app as the enemy", e);
Assert.assertTrue(e.getMessage().contains(
"User " + killer + " cannot perform operation MODIFY_APP on "
+ applicationId));
}
getRMClientForUser(submitter).forceKillApplication(finishAppRequest);
}
private void verifyKillAppSuccess(String submitter, String killer,
String queueName, boolean setupACLs) throws Exception {
ApplicationId applicationId =
submitAppAndGetAppId(submitter, queueName, setupACLs);
final KillApplicationRequest finishAppRequest =
KillApplicationRequest.newInstance(applicationId);
ApplicationClientProtocol ownerClient = getRMClientForUser(killer);
// Kill app as killer
ownerClient.forceKillApplication(finishAppRequest);
resourceManager.waitForState(applicationId, RMAppState.KILLED);
}
private ApplicationId submitAppAndGetAppId(String submitter,
String queueName, boolean setupACLs) throws Exception {
GetNewApplicationRequest newAppRequest =
GetNewApplicationRequest.newInstance();
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
ApplicationId applicationId =
submitterClient.getNewApplication(newAppRequest).getApplicationId();
Resource resource = BuilderUtils.newResource(1024, 1);
Map<ApplicationAccessType, String> acls = createACLs(submitter, setupACLs);
ContainerLaunchContext amContainerSpec =
ContainerLaunchContext.newInstance(null, null, null, null, null, acls);
ApplicationSubmissionContext appSubmissionContext =
ApplicationSubmissionContext.newInstance(applicationId,
"applicationName", queueName, null, amContainerSpec, false, true, 1,
resource, "applicationType");
appSubmissionContext.setApplicationId(applicationId);
appSubmissionContext.setQueue(queueName);
SubmitApplicationRequest submitRequest =
SubmitApplicationRequest.newInstance(appSubmissionContext);
submitterClient.submitApplication(submitRequest);
resourceManager.waitForState(applicationId, RMAppState.ACCEPTED);
return applicationId;
}
private Map<ApplicationAccessType, String> createACLs(String submitter,
boolean setupACLs) {
AccessControlList viewACL = new AccessControlList("");
AccessControlList modifyACL = new AccessControlList("");
if (setupACLs) {
viewACL.addUser(submitter);
viewACL.addUser(COMMON_USER);
modifyACL.addUser(submitter);
modifyACL.addUser(COMMON_USER);
}
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, viewACL.getAclString());
acls.put(ApplicationAccessType.MODIFY_APP, modifyACL.getAclString());
return acls;
}
private ApplicationClientProtocol getRMClientForUser(String user)
throws IOException, InterruptedException {
UserGroupInformation userUGI = UserGroupInformation.createRemoteUser(user);
ApplicationClientProtocol userClient =
userUGI
.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
@Override
public ApplicationClientProtocol run() throws Exception {
return (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
}
});
return userClient;
}
protected abstract Configuration createConfiguration() throws IOException;
}