blob: f746dc2f1883afe8ba6e6ec7afc9c72e4ebaefc3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class TestReservationSystemWithRMHA extends RMHATestBase {
@Override
public void setup() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
configuration = conf;
super.setup();
}
@Test
public void testSubmitReservationAndCheckAfterFailover() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
// Do the failover
explicitFailover();
rm2.registerNode("127.0.0.1:1", 102400, 100);
RMState state = rm2.getRMContext().getStateStore().loadState();
Map<ReservationId, ReservationAllocationStateProto> reservationStateMap =
state.getReservationState().get(ReservationSystemTestUtil.reservationQ);
Assert.assertNotNull(reservationStateMap);
Assert.assertNotNull(reservationStateMap.get(reservationID));
}
@Test
public void testUpdateReservationAndCheckAfterFailover() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// Change any field
long newDeadline = reservationDefinition.getDeadline() + 100;
reservationDefinition.setDeadline(newDeadline);
ReservationUpdateRequest updateRequest = ReservationUpdateRequest
.newInstance(reservationDefinition, reservationID);
rm1.updateReservationState(updateRequest);
// Do the failover
explicitFailover();
rm2.registerNode("127.0.0.1:1", 102400, 100);
RMState state = rm2.getRMContext().getStateStore().loadState();
Map<ReservationId, ReservationAllocationStateProto> reservationStateMap =
state.getReservationState().get(ReservationSystemTestUtil.reservationQ);
Assert.assertNotNull(reservationStateMap);
ReservationAllocationStateProto reservationState =
reservationStateMap.get(reservationID);
Assert.assertEquals(newDeadline,
reservationState.getReservationDefinition().getDeadline());
}
@Test
public void testDeleteReservationAndCheckAfterFailover() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(reservationID);
// Delete the reservation
ReservationDeleteRequest deleteRequest =
ReservationDeleteRequest.newInstance(reservationID);
clientService.deleteReservation(deleteRequest);
// Do the failover
explicitFailover();
rm2.registerNode("127.0.0.1:1", 102400, 100);
RMState state = rm2.getRMContext().getStateStore().loadState();
Assert.assertNull(state.getReservationState()
.get(ReservationSystemTestUtil.reservationQ));
}
private void addNodeCapacityToPlan(MockRM rm, int memory, int vCores) {
try {
rm.registerNode("127.0.0.1:1", memory, vCores);
int attempts = 10;
do {
rm1.drainEvents();
rm.getRMContext().getReservationSystem()
.synchronizePlan(ReservationSystemTestUtil.reservationQ, false);
if (rm.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemorySize() > 0) {
break;
}
LOG.info("Waiting for node capacity to be added to plan");
Thread.sleep(100);
} while (attempts-- > 0);
if (attempts <= 0) {
Assert.fail("Exhausted attempts in checking if node capacity was "
+ "added to the plan");
}
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
private ReservationSubmissionRequest createReservationSubmissionRequest(
ReservationId reservationId) {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + duration + 1500);
return ReservationSystemTestUtil.createSimpleReservationRequest(
reservationId, 4, arrival, deadline, duration);
}
private void validateReservation(Plan plan, ReservationId resId,
ReservationDefinition rDef) {
ReservationAllocation reservation = plan.getReservationById(resId);
Assert.assertNotNull(reservation);
Assert.assertEquals(rDef.getDeadline(),
reservation.getReservationDefinition().getDeadline());
}
@Test
public void testSubmitReservationFailoverAndDelete() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 102400, 100);
// check if reservation exists after failover
Plan plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
// delete the reservation
ReservationDeleteRequest deleteRequest =
ReservationDeleteRequest.newInstance(reservationID);
ReservationDeleteResponse deleteResponse = null;
clientService = rm2.getClientRMService();
try {
deleteResponse = clientService.deleteReservation(deleteRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(deleteResponse);
Assert.assertNull(plan.getReservationById(reservationID));
}
@Test
public void testFailoverAndSubmitReservation() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 102400, 100);
ClientRMService clientService = rm2.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// check if reservation is submitted successfully
Plan plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
}
@Test
public void testSubmitReservationFailoverAndUpdate() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 102400, 100);
// check if reservation exists after failover
Plan plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
// update the reservation
long newDeadline = reservationDefinition.getDeadline() + 100;
reservationDefinition.setDeadline(newDeadline);
ReservationUpdateRequest updateRequest = ReservationUpdateRequest
.newInstance(reservationDefinition, reservationID);
ReservationUpdateResponse updateResponse = null;
clientService = rm2.getClientRMService();
try {
updateResponse = clientService.updateReservation(updateRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(updateResponse);
validateReservation(plan, reservationID, reservationDefinition);
}
@Test
public void testSubmitUpdateReservationFailoverAndDelete() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// check if reservation is submitted successfully
Plan plan = rm1.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
// update the reservation
long newDeadline = reservationDefinition.getDeadline() + 100;
reservationDefinition.setDeadline(newDeadline);
ReservationUpdateRequest updateRequest = ReservationUpdateRequest
.newInstance(reservationDefinition, reservationID);
ReservationUpdateResponse updateResponse = null;
try {
updateResponse = clientService.updateReservation(updateRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(updateResponse);
validateReservation(plan, reservationID, reservationDefinition);
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 102400, 100);
// check if reservation exists after failover
plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
// delete the reservation
ReservationDeleteRequest deleteRequest =
ReservationDeleteRequest.newInstance(reservationID);
ReservationDeleteResponse deleteResponse = null;
clientService = rm2.getClientRMService();
try {
deleteResponse = clientService.deleteReservation(deleteRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(deleteResponse);
Assert.assertNull(plan.getReservationById(reservationID));
}
@Test
public void testReservationResizeAfterFailover() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
ReservationId resID1 = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest(
resID1);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(resID1);
LOG.info("Submit reservation response: " + resID1);
ReservationId resID2 = getNewReservation(clientService)
.getReservationId();
request.setReservationId(resID2);
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(resID2);
LOG.info("Submit reservation response: " + resID2);
ReservationId resID3 = getNewReservation(clientService)
.getReservationId();
request.setReservationId(resID3);
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertNotNull(resID3);
LOG.info("Submit reservation response: " + resID3);
// allow the reservations to become active
waitForReservationActivation(rm1, resID1,
ReservationSystemTestUtil.reservationQ);
// validate reservations before failover
Plan plan = rm1.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, resID1, reservationDefinition);
validateReservation(plan, resID2, reservationDefinition);
validateReservation(plan, resID3, reservationDefinition);
ResourceScheduler scheduler = rm1.getResourceScheduler();
QueueInfo resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
Assert.assertEquals(0.05, resQ1.getCapacity(), 0.001f);
QueueInfo resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
Assert.assertEquals(0.05, resQ2.getCapacity(), 0.001f);
QueueInfo resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
Assert.assertEquals(0.05, resQ3.getCapacity(), 0.001f);
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 5120, 5);
// check if reservations exists after failover
plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, resID1, reservationDefinition);
validateReservation(plan, resID3, reservationDefinition);
// verify if the reservations have been resized
scheduler = rm2.getResourceScheduler();
resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
Assert.assertEquals(1f / 3f, resQ1.getCapacity(), 0.001f);
resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
Assert.assertEquals(1f / 3f, resQ2.getCapacity(), 0.001f);
resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
Assert.assertEquals(1f / 3f, resQ3.getCapacity(), 0.001f);
}
private void waitForReservationActivation(MockRM rm,
ReservationId reservationId, String planName) {
try {
int attempts = 20;
do {
rm.getRMContext().getReservationSystem().synchronizePlan(planName,
false);
if (rm.getResourceScheduler()
.getQueueInfo(reservationId.toString(), false, false)
.getCapacity() > 0f) {
break;
}
LOG.info("Waiting for reservation to be active");
Thread.sleep(100);
} while (attempts-- > 0);
if (attempts <= 0) {
Assert
.fail("Exceeded attempts in waiting for reservation to be active");
}
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
private GetNewReservationResponse getNewReservation(ClientRMService
clientRMService) {
GetNewReservationRequest newReservationRequest = GetNewReservationRequest
.newInstance();
GetNewReservationResponse getNewReservationResponse = null;
try {
getNewReservationResponse = clientRMService.getNewReservation(
newReservationRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
return getNewReservationResponse;
}
}