[Improve]Streampark support remote docker daemon. (#2182)
* [Improve]Support the communication between StreamPark deployed in the container (such as: kata) and the host docker.
Co-authored-by: lvlin241 <lvln241@163.com>
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
index 9b89105..1e62561 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
@@ -47,6 +47,12 @@
classType = classOf[String],
description = "yarn http auth type. ex: sample, kerberos")
+ val DOCKER_HOST: InternalOption = InternalOption(
+ key = "streampark.docker.http-client.docker-host",
+ defaultValue = "",
+ classType = classOf[String],
+ description = "docker host for DockerHttpClient")
+
val DOCKER_MAX_CONNECTIONS: InternalOption = InternalOption(
key = "streampark.docker.http-client.max-connections",
defaultValue = 100,
diff --git a/streampark-console/streampark-console-service/src/main/resources/application.yml b/streampark-console/streampark-console-service/src/main/resources/application.yml
index 58b8cba..441b50f 100644
--- a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++ b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -93,6 +93,7 @@
max-connections: 10000
connection-timeout-sec: 10000
response-timeout-sec: 12000
+ docker-host: ""
# flink-k8s tracking configuration
flink-k8s:
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
index bcb8214..85d0a4f 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
@@ -21,10 +21,11 @@
import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientConfig, HackDockerClient}
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient
import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder}
+import org.apache.streampark.common.util.Utils
+import java.net.URI
import java.time.Duration
-
object DockerRetriever {
/**
@@ -51,8 +52,19 @@
* get new DockerClient instance
*/
def newDockerClient(): DockerClient = {
+ setDockerHost()
HackDockerClient.getInstance(dockerClientConf, dockerHttpClientBuilder.build())
}
+ /**
+ * set docker-host for kata
+ */
+ def setDockerHost(): Unit = {
+ val dockerhost: String = InternalConfigHolder.get(CommonConfig.DOCKER_HOST)
+ if (Utils.notEmpty(dockerhost)) {
+ val dockerHostUri: URI = new URI(dockerhost)
+ dockerHttpClientBuilder.dockerHost(dockerHostUri)
+ }
+ }
}