[Improve] Check the validity of the Docker image (#3686)

* [Improve] Check the validity of the Docker image
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DockerImageService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DockerImageService.java
new file mode 100644
index 0000000..6a7e5d5
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DockerImageService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+public interface DockerImageService {
+
+  /**
+   * Check the validity of the Docker image
+   *
+   * @param imageName
+   * @return
+   */
+  Boolean isDockerImageExist(String imageName);
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DockerImageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DockerImageServiceImpl.java
new file mode 100644
index 0000000..abb33e8
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DockerImageServiceImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.core.service.DockerImageService;
+import org.apache.streampark.flink.packer.docker.DockerImageExist;
+
+public class DockerImageServiceImpl implements DockerImageService {
+
+  @Override
+  public Boolean isDockerImageExist(String imageName) {
+    DockerImageExist dockerImageExist = new DockerImageExist();
+    return dockerImageExist.doesDockerImageExist(imageName);
+  }
+}
diff --git a/streampark-flink/streampark-flink-packer/pom.xml b/streampark-flink/streampark-flink-packer/pom.xml
index d537746..6b5312f 100644
--- a/streampark-flink/streampark-flink-packer/pom.xml
+++ b/streampark-flink/streampark-flink-packer/pom.xml
@@ -140,6 +140,13 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+
+        <!-- jackson-->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>2.12.0</version>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerImageExist.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerImageExist.scala
new file mode 100644
index 0000000..11c26e1
--- /dev/null
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerImageExist.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.docker
+
+import com.github.dockerjava.api.exception.NotFoundException
+
+class DockerImageExist {
+  def doesDockerImageExist(imageName: String): Boolean = {
+    usingDockerClient {
+      dockerClient =>
+        try {
+          dockerClient.inspectImageCmd(imageName).exec()
+          true
+        } catch {
+          case _: NotFoundException => false
+        }
+    }(err => throw new Exception(s"Check docker image failed, imageName=$imageName", err))
+  }
+}
diff --git a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/DockerClientTest.scala b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/DockerClientTest.scala
index c5c6b00..8cb2608 100644
--- a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/DockerClientTest.scala
+++ b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/DockerClientTest.scala
@@ -17,6 +17,7 @@
 package org.apache.streampark.flink.packer
 
 import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder}
+import org.apache.streampark.flink.packer.docker.DockerImageExist
 import org.apache.streampark.flink.packer.docker.DockerRetriever.dockerClientConf
 
 import com.github.dockerjava.httpclient5.ApacheDockerHttpClient
@@ -40,4 +41,20 @@
           InternalConfigHolder.get(CommonConfig.DOCKER_RESPONSE_TIMEOUT_SEC)))
   }
 
+  "Docker Image Exist" should {
+    "return true if the image exists" in {
+      val dockerImageExist = new DockerImageExist()
+      val imageName = "flink:1.18.1-scala_2.12-java8"
+      val result = dockerImageExist.doesDockerImageExist(imageName)
+      assert(result)
+    }
+
+    "return false if the image does not exist" in {
+      val dockerImageExist = new DockerImageExist()
+      val imageName = "flink:1.18.1-scala_2.12-java8-fail"
+      val result = dockerImageExist.doesDockerImageExist(imageName)
+      assert(!result)
+    }
+  }
+
 }