[FLINK-32111] Add Null  check for checkpoints history obj

diff --git a/docs/content/docs/operations/plugins.md b/docs/content/docs/operations/plugins.md
index efbf34e..b25f055 100644
--- a/docs/content/docs/operations/plugins.md
+++ b/docs/content/docs/operations/plugins.md
@@ -107,7 +107,7 @@
 In order to enable your custom `FlinkResourceListener` you need to:
 
  1. Implement the interface
- 2. Add your listener class to `org.apache.flink.kubernetes.operator.listener.FlinkResourceListener` in `META-INF/services`
+ 2. Add your listener class to `org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener` in `META-INF/services`
  3. Package your JAR and add it to the plugins directory of your operator image (`/opt/flink/plugins`)
 
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
index f53d074..99a9abc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
@@ -57,7 +57,7 @@
     private ArrayNode history;
 
     public Optional<PendingCheckpointInfo> getInProgressCheckpoint() {
-        if (history.isEmpty()) {
+        if (history == null || history.isEmpty()) {
             return Optional.empty();
         }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index d723901..805cf58 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -66,11 +66,13 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT;
 import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -308,6 +310,27 @@
     }
 
     @Test
+    public void testGetInProgressCheckpointsFromResponseWithoutHistoryDetails()
+            throws JsonProcessingException {
+        ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+        String response =
+                "{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0}}";
+        var checkpointHistoryWrapper =
+                objectMapper.readValue(response, CheckpointHistoryWrapper.class);
+        Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo =
+                assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+        assertTrue(optionalPendingCheckpointInfo.isEmpty());
+    }
+
+    @Test
+    public void testGetInProgressCheckpointsWithoutHistory() {
+        CheckpointHistoryWrapper checkpointHistoryWrapper = new CheckpointHistoryWrapper();
+        Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo =
+                assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+        assertTrue(optionalPendingCheckpointInfo.isEmpty());
+    }
+
+    @Test
     public void testClusterInfoRestCompatibility() throws JsonProcessingException {
         ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();