YARN-11297. [Federation] Improve Yarn Router Reservation Submission Code. (#4863)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 04452af..ec6f5fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -949,7 +949,13 @@
// Second, determine whether the current ReservationId has a corresponding subCluster.
// If it does not exist, add it. If it exists, update it.
Boolean exists = existsReservationHomeSubCluster(reservationId);
- if (!exists) {
+
+ // We may encounter the situation of repeated submission of Reservation,
+ // at this time we should try to use the reservation that has been allocated
+ // !exists indicates that the reservation does not exist and needs to be added
+ // i==0, mainly to consider repeated submissions,
+ // so the first time to apply for reservation, try to use the original reservation
+ if (!exists || i == 0) {
addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
} else {
updateReservationHomeSubCluster(subClusterId, reservationId, reservationHomeSubCluster);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index 93a759b..ac980b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -1308,13 +1308,6 @@
GetNewReservationResponse response = interceptor.getNewReservation(request);
Assert.assertNotNull(response);
- // allow plan follower to synchronize, manually trigger an assignment
- Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
- for (MockRM mockRM : mockRMs.values()) {
- ReservationSystem reservationSystem = mockRM.getReservationSystem();
- reservationSystem.synchronizePlan("root.decided", true);
- }
-
// Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
@@ -1384,13 +1377,6 @@
GetNewReservationResponse response = interceptor.getNewReservation(request);
Assert.assertNotNull(response);
- // allow plan follower to synchronize, manually trigger an assignment
- Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
- for (MockRM mockRM : mockRMs.values()) {
- ReservationSystem reservationSystem = mockRM.getReservationSystem();
- reservationSystem.synchronizePlan("root.decided", true);
- }
-
// First Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
@@ -1404,10 +1390,12 @@
Assert.assertNotNull(subClusterId1);
Assert.assertTrue(subClusters.contains(subClusterId1));
- // First Retry
+ // First Retry, repeat the submission
ReservationSubmissionResponse submissionResponse1 =
interceptor.submitReservation(rSubmissionRequest);
Assert.assertNotNull(submissionResponse1);
+
+ // Expect reserved clusters to be consistent
SubClusterId subClusterId2 = stateStoreUtil.queryReservationHomeSC(reservationId);
Assert.assertNotNull(subClusterId2);
Assert.assertEquals(subClusterId1, subClusterId2);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index 4e4df3f..8279899 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -28,13 +28,16 @@
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -43,6 +46,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
@@ -95,6 +100,7 @@
mockRMs.put(subClusterId, mockRM);
}
initNodeAttributes(subClusterId, mockRM);
+ initReservationSystem(mockRM);
return mockRM.getClientRMService();
}
}
@@ -167,6 +173,23 @@
}
}
+ private void initReservationSystem(MockRM mockRM) throws YarnException {
+ try {
+ // Ensure that the reserved resources of the RM#Reservation System are allocated
+ String planName = "root.decided";
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan(planName, true);
+
+ GenericTestUtils.waitFor(() -> {
+ Plan plan = reservationSystem.getPlan(planName);
+ Resource resource = plan.getTotalCapacity();
+ return (resource.getMemorySize() > 0 && resource.getVirtualCores() > 0);
+ }, 100, 2000);
+ } catch (TimeoutException | InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+
@Override
public void shutdown() {
if (mockRMs != null && !mockRMs.isEmpty()) {
@@ -193,4 +216,4 @@
mockRMs.clear();
super.shutdown();
}
-}
+}
\ No newline at end of file