[FLINK-32882][SlotManager] Fix NPE when PendingTaskmanager clear pending allocations twice. (#23224)
[FLINK-32882][SlotManager] Fix NPE when PendingTaskmanager clear pending allocations twice.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManager.java
index 8033b18..ff9e479 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManager.java
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
/** Represents a pending task manager in the {@link SlotManager}. */
public class PendingTaskManager {
@@ -82,8 +83,11 @@
}
public void clearPendingAllocationsOfJob(JobID jobId) {
- ResourceCounter resourceCounter = pendingSlotAllocationRecords.remove(jobId);
- unusedResource = unusedResource.merge(resourceCounter.getTotalResource());
+ Optional.ofNullable(pendingSlotAllocationRecords.remove(jobId))
+ .ifPresent(
+ resourceCounter ->
+ unusedResource =
+ unusedResource.merge(resourceCounter.getTotalResource()));
}
private ResourceProfile calculateUnusedResourceProfile() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index 3e1e1bd3..c9c2746 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -985,4 +985,72 @@
}
};
}
+
+ @Test
+ void testClearResourceRequirementsWithPendingTaskManager() throws Exception {
+ new Context() {
+ {
+ final JobID jobId = new JobID();
+ final CompletableFuture<Void> allocateResourceFuture = new CompletableFuture<>();
+
+ resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
+ (resourceDeclarations) -> allocateResourceFuture.complete(null));
+
+ final PendingTaskManager pendingTaskManager1 =
+ new PendingTaskManager(
+ DEFAULT_TOTAL_RESOURCE_PROFILE, DEFAULT_NUM_SLOTS_PER_WORKER);
+ final PendingTaskManager pendingTaskManager2 =
+ new PendingTaskManager(
+ DEFAULT_TOTAL_RESOURCE_PROFILE, DEFAULT_NUM_SLOTS_PER_WORKER);
+ resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
+ ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
+ ResourceAllocationResult.builder()
+ .addPendingTaskManagerAllocate(pendingTaskManager1)
+ .addPendingTaskManagerAllocate(pendingTaskManager2)
+ .addAllocationOnPendingResource(
+ jobId,
+ pendingTaskManager1.getPendingTaskManagerId(),
+ DEFAULT_SLOT_RESOURCE_PROFILE)
+ .addAllocationOnPendingResource(
+ jobId,
+ pendingTaskManager2.getPendingTaskManagerId(),
+ DEFAULT_SLOT_RESOURCE_PROFILE)
+ .build()));
+ runTest(
+ () -> {
+ // assign allocations to pending task managers
+ runInMainThread(
+ () ->
+ getSlotManager()
+ .processResourceRequirements(
+ createResourceRequirements(jobId, 2)));
+ assertFutureCompleteAndReturn(allocateResourceFuture);
+
+ // cancel all slot requests, will trigger
+ // PendingTaskManager#clearPendingAllocationsOfJob
+ runInMainThreadAndWait(
+ () ->
+ getSlotManager()
+ .processResourceRequirements(
+ ResourceRequirements.empty(
+ jobId, "foobar")));
+
+ // disconnect to job master,will trigger
+ // PendingTaskManager#clearPendingAllocationsOfJob again
+ CompletableFuture<Void> clearFuture = new CompletableFuture<>();
+ runInMainThread(
+ () -> {
+ try {
+ getSlotManager().clearResourceRequirements(jobId);
+ } catch (Exception e) {
+ clearFuture.completeExceptionally(e);
+ }
+ clearFuture.complete(null);
+ });
+
+ assertFutureCompleteAndReturn(clearFuture);
+ });
+ }
+ };
+ }
}