[FLINK-32882][SlotManager] Fix NPE when PendingTaskmanager clear pending allocations twice. (#23224)

[FLINK-32882][SlotManager] Fix NPE when PendingTaskmanager clear pending allocations twice.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManager.java
index 8033b18..ff9e479 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManager.java
@@ -25,6 +25,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /** Represents a pending task manager in the {@link SlotManager}. */
 public class PendingTaskManager {
@@ -82,8 +83,11 @@
     }
 
     public void clearPendingAllocationsOfJob(JobID jobId) {
-        ResourceCounter resourceCounter = pendingSlotAllocationRecords.remove(jobId);
-        unusedResource = unusedResource.merge(resourceCounter.getTotalResource());
+        Optional.ofNullable(pendingSlotAllocationRecords.remove(jobId))
+                .ifPresent(
+                        resourceCounter ->
+                                unusedResource =
+                                        unusedResource.merge(resourceCounter.getTotalResource()));
     }
 
     private ResourceProfile calculateUnusedResourceProfile() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index 3e1e1bd3..c9c2746 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -985,4 +985,72 @@
             }
         };
     }
+
+    @Test
+    void testClearResourceRequirementsWithPendingTaskManager() throws Exception {
+        new Context() {
+            {
+                final JobID jobId = new JobID();
+                final CompletableFuture<Void> allocateResourceFuture = new CompletableFuture<>();
+
+                resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
+                        (resourceDeclarations) -> allocateResourceFuture.complete(null));
+
+                final PendingTaskManager pendingTaskManager1 =
+                        new PendingTaskManager(
+                                DEFAULT_TOTAL_RESOURCE_PROFILE, DEFAULT_NUM_SLOTS_PER_WORKER);
+                final PendingTaskManager pendingTaskManager2 =
+                        new PendingTaskManager(
+                                DEFAULT_TOTAL_RESOURCE_PROFILE, DEFAULT_NUM_SLOTS_PER_WORKER);
+                resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
+                        ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
+                                ResourceAllocationResult.builder()
+                                        .addPendingTaskManagerAllocate(pendingTaskManager1)
+                                        .addPendingTaskManagerAllocate(pendingTaskManager2)
+                                        .addAllocationOnPendingResource(
+                                                jobId,
+                                                pendingTaskManager1.getPendingTaskManagerId(),
+                                                DEFAULT_SLOT_RESOURCE_PROFILE)
+                                        .addAllocationOnPendingResource(
+                                                jobId,
+                                                pendingTaskManager2.getPendingTaskManagerId(),
+                                                DEFAULT_SLOT_RESOURCE_PROFILE)
+                                        .build()));
+                runTest(
+                        () -> {
+                            // assign allocations to pending task managers
+                            runInMainThread(
+                                    () ->
+                                            getSlotManager()
+                                                    .processResourceRequirements(
+                                                            createResourceRequirements(jobId, 2)));
+                            assertFutureCompleteAndReturn(allocateResourceFuture);
+
+                            // cancel all slot requests, will trigger
+                            // PendingTaskManager#clearPendingAllocationsOfJob
+                            runInMainThreadAndWait(
+                                    () ->
+                                            getSlotManager()
+                                                    .processResourceRequirements(
+                                                            ResourceRequirements.empty(
+                                                                    jobId, "foobar")));
+
+                            // disconnect to job master,will trigger
+                            // PendingTaskManager#clearPendingAllocationsOfJob again
+                            CompletableFuture<Void> clearFuture = new CompletableFuture<>();
+                            runInMainThread(
+                                    () -> {
+                                        try {
+                                            getSlotManager().clearResourceRequirements(jobId);
+                                        } catch (Exception e) {
+                                            clearFuture.completeExceptionally(e);
+                                        }
+                                        clearFuture.complete(null);
+                                    });
+
+                            assertFutureCompleteAndReturn(clearFuture);
+                        });
+            }
+        };
+    }
 }