YARN-11297. [Federation] Improve Yarn Router Reservation Submission Code. (#4863)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 04452af..ec6f5fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -949,7 +949,13 @@
         // Second, determine whether the current ReservationId has a corresponding subCluster.
         // If it does not exist, add it. If it exists, update it.
         Boolean exists = existsReservationHomeSubCluster(reservationId);
-        if (!exists) {
+
+        // We may encounter the situation of repeated submission of Reservation,
+        // at this time we should try to use the reservation that has been allocated
+        // !exists indicates that the reservation does not exist and needs to be added
+        // i==0, mainly to consider repeated submissions,
+        // so the first time to apply for reservation, try to use the original reservation
+        if (!exists || i == 0) {
           addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
         } else {
           updateReservationHomeSubCluster(subClusterId, reservationId, reservationHomeSubCluster);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index 93a759b..ac980b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -1308,13 +1308,6 @@
     GetNewReservationResponse response = interceptor.getNewReservation(request);
     Assert.assertNotNull(response);
 
-    // allow plan follower to synchronize, manually trigger an assignment
-    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
-    for (MockRM mockRM : mockRMs.values()) {
-      ReservationSystem reservationSystem = mockRM.getReservationSystem();
-      reservationSystem.synchronizePlan("root.decided", true);
-    }
-
     // Submit Reservation
     ReservationId reservationId = response.getReservationId();
     ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
@@ -1384,13 +1377,6 @@
     GetNewReservationResponse response = interceptor.getNewReservation(request);
     Assert.assertNotNull(response);
 
-    // allow plan follower to synchronize, manually trigger an assignment
-    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
-    for (MockRM mockRM : mockRMs.values()) {
-      ReservationSystem reservationSystem = mockRM.getReservationSystem();
-      reservationSystem.synchronizePlan("root.decided", true);
-    }
-
     // First Submit Reservation
     ReservationId reservationId = response.getReservationId();
     ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
@@ -1404,10 +1390,12 @@
     Assert.assertNotNull(subClusterId1);
     Assert.assertTrue(subClusters.contains(subClusterId1));
 
-    // First Retry
+    // First Retry, repeat the submission
     ReservationSubmissionResponse submissionResponse1 =
         interceptor.submitReservation(rSubmissionRequest);
     Assert.assertNotNull(submissionResponse1);
+
+    // Expect reserved clusters to be consistent
     SubClusterId subClusterId2 = stateStoreUtil.queryReservationHomeSC(reservationId);
     Assert.assertNotNull(subClusterId2);
     Assert.assertEquals(subClusterId1, subClusterId2);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index 4e4df3f..8279899 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -28,13 +28,16 @@
 import java.util.Map;
 import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -43,6 +46,8 @@
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
@@ -95,6 +100,7 @@
         mockRMs.put(subClusterId, mockRM);
       }
       initNodeAttributes(subClusterId, mockRM);
+      initReservationSystem(mockRM);
       return mockRM.getClientRMService();
     }
   }
@@ -167,6 +173,23 @@
     }
   }
 
+  private void initReservationSystem(MockRM mockRM) throws YarnException {
+    try {
+      // Ensure that the reserved resources of the RM#Reservation System are allocated
+      String planName = "root.decided";
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan(planName, true);
+
+      GenericTestUtils.waitFor(() -> {
+        Plan plan = reservationSystem.getPlan(planName);
+        Resource resource = plan.getTotalCapacity();
+        return (resource.getMemorySize() > 0 && resource.getVirtualCores() > 0);
+      }, 100, 2000);
+    } catch (TimeoutException | InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
   @Override
   public void shutdown() {
     if (mockRMs != null && !mockRMs.isEmpty()) {
@@ -193,4 +216,4 @@
     mockRMs.clear();
     super.shutdown();
   }
-}
+}
\ No newline at end of file