[Improve] Check the validity of the Docker image (#3686)
* [Improve] Check the validity of the Docker image
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DockerImageService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DockerImageService.java
new file mode 100644
index 0000000..6a7e5d5
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DockerImageService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+public interface DockerImageService {
+
+ /**
+ * Check the validity of the Docker image
+ *
+ * @param imageName
+ * @return
+ */
+ Boolean isDockerImageExist(String imageName);
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DockerImageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DockerImageServiceImpl.java
new file mode 100644
index 0000000..abb33e8
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DockerImageServiceImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.core.service.DockerImageService;
+import org.apache.streampark.flink.packer.docker.DockerImageExist;
+
+public class DockerImageServiceImpl implements DockerImageService {
+
+ @Override
+ public Boolean isDockerImageExist(String imageName) {
+ DockerImageExist dockerImageExist = new DockerImageExist();
+ return dockerImageExist.doesDockerImageExist(imageName);
+ }
+}
diff --git a/streampark-flink/streampark-flink-packer/pom.xml b/streampark-flink/streampark-flink-packer/pom.xml
index d537746..6b5312f 100644
--- a/streampark-flink/streampark-flink-packer/pom.xml
+++ b/streampark-flink/streampark-flink-packer/pom.xml
@@ -140,6 +140,13 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+
+ <!-- jackson-->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>2.12.0</version>
+ </dependency>
</dependencies>
<profiles>
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerImageExist.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerImageExist.scala
new file mode 100644
index 0000000..11c26e1
--- /dev/null
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerImageExist.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.docker
+
+import com.github.dockerjava.api.exception.NotFoundException
+
+class DockerImageExist {
+ def doesDockerImageExist(imageName: String): Boolean = {
+ usingDockerClient {
+ dockerClient =>
+ try {
+ dockerClient.inspectImageCmd(imageName).exec()
+ true
+ } catch {
+ case _: NotFoundException => false
+ }
+ }(err => throw new Exception(s"Check docker image failed, imageName=$imageName", err))
+ }
+}
diff --git a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/DockerClientTest.scala b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/DockerClientTest.scala
index c5c6b00..8cb2608 100644
--- a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/DockerClientTest.scala
+++ b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/DockerClientTest.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.packer
import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder}
+import org.apache.streampark.flink.packer.docker.DockerImageExist
import org.apache.streampark.flink.packer.docker.DockerRetriever.dockerClientConf
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient
@@ -40,4 +41,20 @@
InternalConfigHolder.get(CommonConfig.DOCKER_RESPONSE_TIMEOUT_SEC)))
}
+ "Docker Image Exist" should {
+ "return true if the image exists" in {
+ val dockerImageExist = new DockerImageExist()
+ val imageName = "flink:1.18.1-scala_2.12-java8"
+ val result = dockerImageExist.doesDockerImageExist(imageName)
+ assert(result)
+ }
+
+ "return false if the image does not exist" in {
+ val dockerImageExist = new DockerImageExist()
+ val imageName = "flink:1.18.1-scala_2.12-java8-fail"
+ val result = dockerImageExist.doesDockerImageExist(imageName)
+ assert(!result)
+ }
+ }
+
}