blob: b41ba83679880bc69e59162ca1cb14bbd910289d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ACLsTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
public class TestApplicationPriorityACLs extends ACLsTestBase {
private final int defaultPriorityQueueA = 3;
private final int defaultPriorityQueueB = 10;
private final int maxPriorityQueueA = 5;
private final int maxPriorityQueueB = 11;
private final int clusterMaxPriority = 10;
@Test
public void testApplicationACLs() throws Exception {
/*
* Cluster Max-priority is 10. User 'queueA_user' has permission to submit
* apps only at priority 5. Default priority for this user is 3.
*/
// Case 1: App will be submitted with priority 5.
verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, 5);
// Case 2: App will be rejected as submitted priority was 6.
verifyAppSubmitWithPriorityFailure(QUEUE_A_USER, QUEUEA, 6);
// Case 3: App will be submitted w/o priority, hence consider default 3.
verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, -1);
// Case 4: App will be submitted with priority 11.
verifyAppSubmitWithPrioritySuccess(QUEUE_B_USER, QUEUEB, 11);
}
private void verifyAppSubmitWithPrioritySuccess(String submitter,
String queueName, int priority) throws Exception {
Priority appPriority = null;
if (priority > 0) {
appPriority = Priority.newInstance(priority);
} else {
// RM will consider default priority for the submitted user. So update
// priority to the default value to compare.
priority = defaultPriorityQueueA;
}
ApplicationSubmissionContext submissionContext = prepareForAppSubmission(
submitter, queueName, appPriority);
submitAppToRMWithValidAcl(submitter, submissionContext);
// Ideally get app report here and check the priority.
verifyAppPriorityIsAccepted(submitter, submissionContext.getApplicationId(),
priority);
}
private void verifyAppSubmitWithPriorityFailure(String submitter,
String queueName, int priority) throws Exception {
Priority appPriority = Priority.newInstance(priority);
ApplicationSubmissionContext submissionContext = prepareForAppSubmission(
submitter, queueName, appPriority);
submitAppToRMWithInValidAcl(submitter, submissionContext);
}
private ApplicationSubmissionContext prepareForAppSubmission(String submitter,
String queueName, Priority priority) throws Exception {
GetNewApplicationRequest newAppRequest = GetNewApplicationRequest
.newInstance();
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
ApplicationId applicationId = submitterClient
.getNewApplication(newAppRequest).getApplicationId();
Resource resource = BuilderUtils.newResource(1024, 1);
ContainerLaunchContext amContainerSpec = ContainerLaunchContext
.newInstance(null, null, null, null, null, null);
ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext
.newInstance(applicationId, "applicationName", queueName, null,
amContainerSpec, false, true, 1, resource, "applicationType");
appSubmissionContext.setApplicationId(applicationId);
appSubmissionContext.setQueue(queueName);
if (null != priority) {
appSubmissionContext.setPriority(priority);
}
return appSubmissionContext;
}
private void submitAppToRMWithValidAcl(String submitter,
ApplicationSubmissionContext appSubmissionContext)
throws YarnException, IOException, InterruptedException {
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
SubmitApplicationRequest submitRequest = SubmitApplicationRequest
.newInstance(appSubmissionContext);
submitterClient.submitApplication(submitRequest);
resourceManager.waitForState(appSubmissionContext.getApplicationId(),
RMAppState.ACCEPTED);
}
private void submitAppToRMWithInValidAcl(String submitter,
ApplicationSubmissionContext appSubmissionContext)
throws YarnException, IOException, InterruptedException {
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
SubmitApplicationRequest submitRequest = SubmitApplicationRequest
.newInstance(appSubmissionContext);
try {
submitterClient.submitApplication(submitRequest);
} catch (YarnException ex) {
Assert.assertTrue(ex.getCause() instanceof RemoteException);
}
}
private void verifyAppPriorityIsAccepted(String submitter,
ApplicationId applicationId, int priority)
throws IOException, InterruptedException {
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
/**
* If priority is greater than cluster max, RM will auto set to cluster max
* Consider this scenario as a special case.
*/
if (priority > clusterMaxPriority) {
priority = clusterMaxPriority;
}
GetApplicationReportRequest request = GetApplicationReportRequest
.newInstance(applicationId);
try {
GetApplicationReportResponse response = submitterClient
.getApplicationReport(request);
Assert.assertEquals(response.getApplicationReport().getPriority(),
Priority.newInstance(priority));
} catch (YarnException e) {
Assert.fail("Application submission should not fail.");
}
}
@Override
protected Configuration createConfiguration() {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{QUEUEA, QUEUEB, QUEUEC});
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f);
String[] aclsForA = new String[2];
aclsForA[0] = QUEUE_A_USER;
aclsForA[1] = QUEUE_A_GROUP;
csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
Priority.newInstance(maxPriorityQueueA),
Priority.newInstance(defaultPriorityQueueA), aclsForA);
String[] aclsForB = new String[2];
aclsForB[0] = QUEUE_B_USER;
aclsForB[1] = QUEUE_B_GROUP;
csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB,
Priority.newInstance(maxPriorityQueueB),
Priority.newInstance(defaultPriorityQueueB), aclsForB);
csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
csConf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
return csConf;
}
}