blob: c8ee00e60bcb63ac8fb9fe2d63ea74b2098d3ddc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ReservationACLsTestBase extends ACLsTestBase {
private final int defaultDuration = 600000;
private final ReservationRequest defaultRequest = ReservationRequest
.newInstance(BuilderUtils.newResource(1024, 1), 1, 1,
defaultDuration);
private final ReservationRequests defaultRequests = ReservationRequests
.newInstance(Collections.singletonList(defaultRequest),
ReservationRequestInterpreter.R_ALL);
private Configuration configuration;
private boolean useFullQueuePath;
public ReservationACLsTestBase(Configuration conf, boolean useFullPath) {
configuration = conf;
useFullQueuePath = useFullPath;
}
@After
public void tearDown() {
if (resourceManager != null) {
resourceManager.stop();
}
}
@Parameterized.Parameters
public static Collection<Object[]> data() throws IOException {
return Arrays.asList(new Object[][] {
{ createCapacitySchedulerConfiguration(), false },
{ createFairSchedulerConfiguration(), true }
});
}
@Test
public void testApplicationACLs() throws Exception {
registerNode("test:1234", 8192, 8);
String queueA = !useFullQueuePath? QUEUEA : CapacitySchedulerConfiguration
.ROOT + "." + QUEUEA;
String queueB = !useFullQueuePath? QUEUEB : CapacitySchedulerConfiguration
.ROOT + "." + QUEUEB;
String queueC = !useFullQueuePath? QUEUEC : CapacitySchedulerConfiguration
.ROOT + "." + QUEUEC;
// Submit Reservations
// Users of queue A can submit reservations on QueueA.
verifySubmitReservationSuccess(QUEUE_A_USER, queueA);
verifySubmitReservationSuccess(QUEUE_A_ADMIN, queueA);
// Users of queue B cannot submit reservations on QueueA.
verifySubmitReservationFailure(QUEUE_B_USER, queueA);
verifySubmitReservationFailure(QUEUE_B_ADMIN, queueA);
// Users of queue B can submit reservations on QueueB.
verifySubmitReservationSuccess(QUEUE_B_USER, queueB);
verifySubmitReservationSuccess(QUEUE_B_ADMIN, queueB);
// Users of queue A cannot submit reservations on QueueB.
verifySubmitReservationFailure(QUEUE_A_USER, queueB);
verifySubmitReservationFailure(QUEUE_A_ADMIN, queueB);
// Everyone can submit reservations on QueueC.
verifySubmitReservationSuccess(QUEUE_B_USER, queueC);
verifySubmitReservationSuccess(QUEUE_B_ADMIN, queueC);
verifySubmitReservationSuccess(QUEUE_A_USER, queueC);
verifySubmitReservationSuccess(QUEUE_A_ADMIN, queueC);
verifySubmitReservationSuccess(COMMON_USER, queueC);
// List Reservations
// User with List Reservations, or Admin ACL can list everyone's
// reservations.
verifyListReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueA);
verifyListReservationSuccess(COMMON_USER, QUEUE_A_ADMIN, queueA);
verifyListReservationSuccess(COMMON_USER, QUEUE_A_USER, queueA);
// User without Admin or Reservation ACL can only list their own
// reservations by id.
verifyListReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_ADMIN, queueA);
verifyListReservationFailure(QUEUE_A_USER, QUEUE_A_USER, queueA);
verifyListReservationFailure(QUEUE_A_USER, QUEUE_A_ADMIN, queueA);
verifyListReservationByIdSuccess(QUEUE_A_USER, QUEUE_A_USER, queueA);
verifyListReservationByIdFailure(QUEUE_A_USER, QUEUE_A_ADMIN, queueA);
// User with List Reservations, or Admin ACL can list everyone's
// reservations.
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_USER, queueB);
verifyListReservationSuccess(COMMON_USER, QUEUE_B_ADMIN, queueB);
verifyListReservationSuccess(COMMON_USER, QUEUE_B_USER, queueB);
// User without Admin or Reservation ACL can only list their own
// reservations by id.
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_ADMIN, queueB);
verifyListReservationFailure(QUEUE_B_USER, QUEUE_B_USER, queueB);
verifyListReservationFailure(QUEUE_B_USER, QUEUE_B_ADMIN, queueB);
verifyListReservationByIdSuccess(QUEUE_B_USER, QUEUE_B_USER, queueB);
verifyListReservationByIdFailure(QUEUE_B_USER, QUEUE_B_ADMIN, queueB);
// Users with Admin ACL in one queue cannot list reservations in
// another queue
verifyListReservationFailure(QUEUE_B_ADMIN, QUEUE_A_ADMIN, queueA);
verifyListReservationFailure(QUEUE_B_ADMIN, QUEUE_A_USER, queueA);
verifyListReservationFailure(QUEUE_A_ADMIN, QUEUE_B_ADMIN, queueB);
verifyListReservationFailure(QUEUE_A_ADMIN, QUEUE_B_USER, queueB);
// All users can list reservations on QueueC because acls are enabled
// but not defined.
verifyListReservationSuccess(QUEUE_A_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationSuccess(QUEUE_B_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_A_ADMIN, queueC);
verifyListReservationSuccess(COMMON_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueC);
verifyListReservationSuccess(QUEUE_B_USER, QUEUE_A_USER, queueC);
verifyListReservationSuccess(QUEUE_B_ADMIN, QUEUE_A_USER, queueC);
verifyListReservationSuccess(COMMON_USER, QUEUE_A_USER, queueC);
verifyListReservationByIdSuccess(QUEUE_A_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationByIdSuccess(QUEUE_B_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationByIdSuccess(QUEUE_B_ADMIN, QUEUE_A_ADMIN, queueC);
verifyListReservationByIdSuccess(COMMON_USER, QUEUE_A_ADMIN, queueC);
verifyListReservationByIdSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueC);
verifyListReservationByIdSuccess(QUEUE_B_USER, QUEUE_A_USER, queueC);
verifyListReservationByIdSuccess(QUEUE_B_ADMIN, QUEUE_A_USER, queueC);
verifyListReservationByIdSuccess(COMMON_USER, QUEUE_A_USER, queueC);
// Delete Reservations
// Only the user who made the reservation or an admin can delete it.
verifyDeleteReservationSuccess(QUEUE_A_USER, QUEUE_A_USER, queueA);
verifyDeleteReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueA);
// A non-admin cannot delete another user's reservation.
verifyDeleteReservationFailure(COMMON_USER, QUEUE_A_USER, queueA);
verifyDeleteReservationFailure(QUEUE_B_USER, QUEUE_A_USER, queueA);
verifyDeleteReservationFailure(QUEUE_B_ADMIN, QUEUE_A_USER, queueA);
// Only the user who made the reservation or an admin can delete it.
verifyDeleteReservationSuccess(QUEUE_B_USER, QUEUE_B_USER, queueB);
verifyDeleteReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_USER, queueB);
// A non-admin cannot delete another user's reservation.
verifyDeleteReservationFailure(COMMON_USER, QUEUE_B_USER, queueB);
verifyDeleteReservationFailure(QUEUE_A_USER, QUEUE_B_USER, queueB);
verifyDeleteReservationFailure(QUEUE_A_ADMIN, QUEUE_B_USER, queueB);
// All users can delete any reservation on QueueC because acls are enabled
// but not defined.
verifyDeleteReservationSuccess(COMMON_USER, QUEUE_B_ADMIN, queueC);
verifyDeleteReservationSuccess(QUEUE_B_USER, QUEUE_B_ADMIN, queueC);
verifyDeleteReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_ADMIN, queueC);
verifyDeleteReservationSuccess(QUEUE_A_USER, QUEUE_B_ADMIN, queueC);
verifyDeleteReservationSuccess(QUEUE_A_ADMIN, QUEUE_B_ADMIN, queueC);
// Update Reservation
// Only the user who made the reservation or an admin can update it.
verifyUpdateReservationSuccess(QUEUE_A_USER, QUEUE_A_USER, queueA);
verifyUpdateReservationSuccess(QUEUE_A_ADMIN, QUEUE_A_USER, queueA);
// A non-admin cannot update another user's reservation.
verifyUpdateReservationFailure(COMMON_USER, QUEUE_A_USER, queueA);
verifyUpdateReservationFailure(QUEUE_B_USER,QUEUE_A_USER, queueA);
verifyUpdateReservationFailure(QUEUE_B_ADMIN, QUEUE_A_USER, queueA);
// Only the user who made the reservation or an admin can update it.
verifyUpdateReservationSuccess(QUEUE_B_USER, QUEUE_B_USER, queueB);
verifyUpdateReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_USER, queueB);
// A non-admin cannot update another user's reservation.
verifyUpdateReservationFailure(COMMON_USER, QUEUE_B_USER, queueB);
verifyUpdateReservationFailure(QUEUE_A_USER, QUEUE_B_USER, queueB);
verifyUpdateReservationFailure(QUEUE_A_ADMIN, QUEUE_B_USER, queueB);
// All users can update any reservation on QueueC because acls are enabled
// but not defined.
verifyUpdateReservationSuccess(COMMON_USER, QUEUE_B_ADMIN, queueC);
verifyUpdateReservationSuccess(QUEUE_B_USER, QUEUE_B_ADMIN, queueC);
verifyUpdateReservationSuccess(QUEUE_B_ADMIN, QUEUE_B_ADMIN, queueC);
verifyUpdateReservationSuccess(QUEUE_A_USER, QUEUE_B_ADMIN, queueC);
verifyUpdateReservationSuccess(QUEUE_A_ADMIN, QUEUE_B_ADMIN, queueC);
}
private void verifySubmitReservationSuccess(String submitter, String
queueName) throws Exception {
ReservationId reservationId = createReservation(submitter);
submitReservation(submitter, queueName, reservationId);
deleteReservation(submitter, reservationId);
}
private void verifySubmitReservationFailure(String submitter, String
queueName) throws Exception {
try {
ReservationId reservationId = createReservation(submitter);
submitReservation(submitter, queueName, reservationId);
Assert.fail("Submit reservation by the enemy should fail!");
} catch (YarnException e) {
handleAdministerException(e, submitter, queueName, ReservationACL
.SUBMIT_RESERVATIONS.name());
}
}
private void verifyListReservationSuccess(String lister, String
originalSubmitter, String queueName) throws Exception {
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
ReservationListResponse adminResponse = listReservation(lister, queueName);
assert(adminResponse.getReservationAllocationState().size() == 1);
assert(adminResponse.getReservationAllocationState().get(0).getUser()
.equals(originalSubmitter));
deleteReservation(originalSubmitter, reservationId);
}
private void verifyListReservationFailure(String lister,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
try {
listReservation(lister, queueName);
Assert.fail("List reservation by the enemy should fail!");
} catch (YarnException e) {
handleAdministerException(e, lister, queueName, ReservationACL
.LIST_RESERVATIONS.name());
}
deleteReservation(originalSubmitter, reservationId);
}
private void verifyListReservationByIdSuccess(String lister, String
originalSubmitter, String queueName) throws Exception {
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
ReservationListResponse adminResponse = listReservationById(lister,
reservationId, queueName);
assert(adminResponse.getReservationAllocationState().size() == 1);
assert(adminResponse.getReservationAllocationState().get(0).getUser()
.equals(originalSubmitter));
deleteReservation(originalSubmitter, reservationId);
}
private void verifyListReservationByIdFailure(String lister,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
try {
listReservationById(lister, reservationId, queueName);
Assert.fail("List reservation by the enemy should fail!");
} catch (YarnException e) {
handleAdministerException(e, lister, queueName, ReservationACL
.LIST_RESERVATIONS.name());
}
deleteReservation(originalSubmitter, reservationId);
}
private void verifyDeleteReservationSuccess(String killer,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
deleteReservation(killer, reservationId);
}
private void verifyDeleteReservationFailure(String killer,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
try {
deleteReservation(killer, reservationId);
Assert.fail("Reservation deletion by the enemy should fail!");
} catch (YarnException e) {
handleAdministerException(e, killer, queueName, ReservationACL
.ADMINISTER_RESERVATIONS.name());
}
deleteReservation(originalSubmitter, reservationId);
}
private void verifyUpdateReservationSuccess(String updater,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
final ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(
makeSimpleReservationDefinition(), reservationId);
ApplicationClientProtocol ownerClient = getRMClientForUser(updater);
ownerClient.updateReservation(updateRequest);
deleteReservation(updater, reservationId);
}
private void verifyUpdateReservationFailure(String updater,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
final ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(
makeSimpleReservationDefinition(), reservationId);
ApplicationClientProtocol unauthorizedClient = getRMClientForUser(updater);
try {
unauthorizedClient.updateReservation(updateRequest);
Assert.fail("Reservation updating by the enemy should fail.");
} catch (YarnException e) {
handleAdministerException(e, updater, queueName, ReservationACL
.ADMINISTER_RESERVATIONS.name());
}
deleteReservation(originalSubmitter, reservationId);
}
private ReservationDefinition makeSimpleReservationDefinition() {
long arrival = System.currentTimeMillis();
String reservationName = UUID.randomUUID().toString();
return ReservationDefinition.newInstance
(arrival, arrival + (int)(defaultDuration * 1.1), defaultRequests,
reservationName);
}
private ReservationListResponse listReservationById(String lister,
ReservationId reservationId, String queueName) throws Exception {
final ReservationListRequest listRequest =
ReservationListRequest.newInstance(queueName, reservationId
.toString(), -1, -1, false);
ApplicationClientProtocol ownerClient = getRMClientForUser(lister);
return ownerClient.listReservations(listRequest);
}
private ReservationListResponse listReservation(String lister,
String queueName) throws Exception {
final ReservationListRequest listRequest =
ReservationListRequest.newInstance(queueName, null, -1, -1, false);
ApplicationClientProtocol ownerClient = getRMClientForUser(lister);
return ownerClient.listReservations(listRequest);
}
private void deleteReservation(String deleter, ReservationId id) throws
Exception {
ApplicationClientProtocol deleteClient = getRMClientForUser(deleter);
final ReservationDeleteRequest deleteRequest = ReservationDeleteRequest
.newInstance(id);
deleteClient.deleteReservation(deleteRequest);
}
private ReservationId createReservation(String creator) throws Exception {
ApplicationClientProtocol creatorClient = getRMClientForUser(creator);
GetNewReservationRequest getNewReservationRequest =
GetNewReservationRequest.newInstance();
GetNewReservationResponse response = creatorClient
.getNewReservation(getNewReservationRequest);
return response.getReservationId();
}
private void submitReservation(String submitter,
String queueName, ReservationId reservationId) throws Exception {
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
ReservationSubmissionRequest reservationSubmissionRequest =
ReservationSubmissionRequest.newInstance(
makeSimpleReservationDefinition(), queueName, reservationId);
ReservationSubmissionResponse response = submitterClient
.submitReservation(reservationSubmissionRequest);
}
private void handleAdministerException(Exception e, String user, String
queue, String operation) {
LOG.info("Got exception while killing app as the enemy", e);
Assert.assertTrue(e.getMessage().contains("User " + user
+ " cannot perform operation " + operation + " on queue "
+ queue));
}
private void registerNode(String host, int memory, int vCores) throws
Exception {
try {
resourceManager.registerNode(host, memory, vCores);
int attempts = 10;
Collection<Plan> plans;
do {
resourceManager.drainEvents();
LOG.info("Waiting for node capacity to be added to plan");
plans = resourceManager.getRMContext().getReservationSystem()
.getAllPlans().values();
if (checkCapacity(plans)) {
break;
}
Thread.sleep(100);
} while (attempts-- > 0);
if (attempts <= 0) {
Assert.fail("Exhausted attempts in checking if node capacity was "
+ "added to the plan");
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
private boolean checkCapacity(Collection<Plan> plans) {
for (Plan plan : plans) {
if (plan.getTotalCapacity().getMemorySize() > 0) {
return true;
}
}
return false;
}
private static Configuration createCapacitySchedulerConfiguration() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
QUEUEA, QUEUEB, QUEUEC });
String absoluteQueueA = CapacitySchedulerConfiguration.ROOT + "." + QUEUEA;
String absoluteQueueB = CapacitySchedulerConfiguration.ROOT + "." + QUEUEB;
String absoluteQueueC = CapacitySchedulerConfiguration.ROOT + "." + QUEUEC;
csConf.setCapacity(absoluteQueueA, 50f);
csConf.setCapacity(absoluteQueueB, 20f);
csConf.setCapacity(absoluteQueueC, 30f);
csConf.setReservable(absoluteQueueA, true);
csConf.setReservable(absoluteQueueB, true);
csConf.setReservable(absoluteQueueC, true);
// Set up ACLs on Queue A
Map<ReservationACL, AccessControlList> reservationAclsOnQueueA =
new HashMap<>();
AccessControlList submitACLonQueueA = new AccessControlList(QUEUE_A_USER);
AccessControlList adminACLonQueueA = new AccessControlList(QUEUE_A_ADMIN);
AccessControlList listACLonQueueA = new AccessControlList(COMMON_USER);
reservationAclsOnQueueA.put(ReservationACL.SUBMIT_RESERVATIONS,
submitACLonQueueA);
reservationAclsOnQueueA.put(ReservationACL.ADMINISTER_RESERVATIONS,
adminACLonQueueA);
reservationAclsOnQueueA.put(ReservationACL.LIST_RESERVATIONS,
listACLonQueueA);
csConf.setReservationAcls(absoluteQueueA, reservationAclsOnQueueA);
// Set up ACLs on Queue B
Map<ReservationACL, AccessControlList> reservationAclsOnQueueB =
new HashMap<>();
AccessControlList submitACLonQueueB = new AccessControlList(QUEUE_B_USER);
AccessControlList adminACLonQueueB = new AccessControlList(QUEUE_B_ADMIN);
AccessControlList listACLonQueueB = new AccessControlList(COMMON_USER);
reservationAclsOnQueueB.put(ReservationACL.SUBMIT_RESERVATIONS,
submitACLonQueueB);
reservationAclsOnQueueB.put(ReservationACL.ADMINISTER_RESERVATIONS,
adminACLonQueueB);
reservationAclsOnQueueB.put(ReservationACL.LIST_RESERVATIONS,
listACLonQueueB);
csConf.setReservationAcls(absoluteQueueB, reservationAclsOnQueueB);
csConf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
csConf.setBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE, true);
csConf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
return csConf;
}
private static Configuration createFairSchedulerConfiguration() throws
IOException {
FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
final String TEST_DIR = new File(System.getProperty("test.build.data",
"/tmp")).getAbsolutePath();
final String ALLOC_FILE = new File(TEST_DIR, "test-queues.xml")
.getAbsolutePath();
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <defaultQueueSchedulingPolicy>drf" +
"</defaultQueueSchedulingPolicy>");
out.println(" <queue name=\"queueA\">");
out.println(" <aclSubmitReservations>" +
"queueA_user,common_user " +
"</aclSubmitReservations>");
out.println(" <aclAdministerReservations>" +
"queueA_admin " +
"</aclAdministerReservations>");
out.println(" <aclListReservations>common_user </aclListReservations>");
out.println(" <aclSubmitApps>queueA_user,common_user </aclSubmitApps>");
out.println(" <aclAdministerApps>queueA_admin </aclAdministerApps>");
out.println(" <reservation> </reservation>");
out.println(" </queue>");
out.println(" <queue name=\"queueB\">");
out.println(" <aclSubmitApps>queueB_user,common_user </aclSubmitApps>");
out.println(" <aclAdministerApps>queueB_admin </aclAdministerApps>");
out.println(" <aclSubmitReservations>" +
"queueB_user,common_user " +
"</aclSubmitReservations>");
out.println(" <aclAdministerReservations>" +
"queueB_admin " +
"</aclAdministerReservations>");
out.println(" <aclListReservations>common_user </aclListReservations>");
out.println(" <reservation> </reservation>");
out.println(" </queue>");
out.println(" <queue name=\"queueC\">");
out.println(" <reservation> </reservation>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
fsConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
fsConf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
fsConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
fsConf.setBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE, true);
fsConf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
return fsConf;
}
@Override
protected Configuration createConfiguration() {
return configuration;
}
}