slug: flink-development-framework-streampark title: Flink 开发利器 StreamPark tags: [StreamPark, DataStream, FlinkSQL]

1. 背景

Hadoop 体系虽然在目前应用非常广泛,但架构繁琐、运维复杂度过高、版本升级困难,且由于部门原因,数据中台需求排期较长,我们急需探索敏捷性开发的数据平台模式。在目前云原生架构的普及和湖仓一体化的大背景下,我们已经确定了将 Doris 作为离线数据仓库,将 TiDB(目前已经应用于生产)作为实时数据平台,同时因为 Doris 具有 on MySQL 的 ODBC 能力,所以又可以对外部数据库资源进行整合,统一对外输出报表



2. 遇到的问题

在数据引擎上,我们确定使用 Spark 和 Flink

  • 使用 Spark on K8s client 客户端模式做离线数据处理
  • 使用 Flink on K8s Native-Application/Session 模式做实时任务流管理

在这里,实际上有一些问题我们一直没有彻底解决:

用过 Native-Application 模式的朋友都知道,每提交一个任务,都需要打包新的镜像,提交到私有仓库,然后再调用 Flink Run 指令沟通 K8s,去拉取镜像运行 Pod。任务提交之后,还需要去 K8s 查看 log, 但是:

  1. 任务运行监控怎么处理?
  2. 使用 Cluster 模式还是 NodePort 暴露端口访问 Web UI?
  3. 提交任务能否简化打包镜像的流程?
  4. 如何减少开发压力?



3. 解决问题的过程

以上的这些其实都是需要解决的问题,如果单纯地使用命令行去提交每个任务,是不现实的,任务量大了,会变得不可维护。如何解决这些问题变成一个不得不面对的问题。

简化镜像构建

首先,针对 Flink 原生镜像需要二次 build 的问题:我们利用了 MinIO 作为外部存储,并使用 s3-fuse 通过 DaemonSet 的方式直接挂载在了每个宿主节点上,我们所需要提交的 jar 包都可以放到上面统一管理。这样的话,即使扩缩容 Flink 节点,也能实现 S3 挂载自动伸缩。

Flink 从 1.13 版本开始,就支持 Pod Template,我们可以在 Pod Template 中利用数据卷挂载的方式再将宿主机目录挂载到每个 pod 中,从而无需镜像打包而直接在 K8s 上运行 Flink 程序。如上图,我们将 S3 先通过 s3-fuse Pod 挂载在 Node 1、Node 2 的 /mnt/data-s3fs 目录下,然后再将 /mnt/data-s3fs 挂载到 Pod A 中。

但是,因为对象存储随机写入或追加文件需要重写整个对象,导致这种方式仅适合于频繁读。而这刚好满足我们现在的场景。

引入 StreamPark

之前我们写 Flink SQL 基本上都是使用 Java 包装 SQL,打 jar 包,提交到 S3 平台上。通过命令行方式提交代码,但这种方式始终不友好,流程繁琐,开发和运维成本太大。我们希望能够进一步简化流程,将 Flink TableEnvironment 抽象出来,有平台负责初始化、打包运行 Flink 任务,实现 Flink 应用程序的构建、测试和部署自动化。

这是个开源兴起的时代,我们自然而然的将目光投向开源领域中:在一众开源项目中,经过对比各个项目综合评估发现 <span style={{color:‘red’}}> Zeppelin 和 <span style={{color:‘red’}}> StreamPark 这两个项目对 Flink 的支持较为完善,都宣称支持 <span style={{color:‘red’}}> Flink on K8s ,最终进入到我们的目标选择范围中,以下是两者在 K8s 相关支持的简单比较(目前如果有更新,麻烦批评指正)。

调研过程中,我们与两者的主开发人员都进行了多次沟通。经过我们反复研究之后,还是决定将 StreamPark 作为我们目前的 Flink 开发工具来使用。

经过开发同学长时间开发测试,StreamPark 目前已经具备:

  • 完善的<span style={{“color”: “red”}}>SQL 校验功能
  • 实现了<span style={{“color”: “red”}}>自动 build/push 镜像
  • 使用自定义类加载器,通过 Child-first 加载方式 <span style={{“color”: “red”}}>解决了 YARN 和 K8s 两种运行模式、<span style={{“color”: “red”}}>支持了自由切换 Flink 多版本
  • 与 Flink-Kubernetes 进行深度整合,提交任务后返回 WebUI,通过 remote rest api + remote K8s,<span style={{“color”: “red”}}>追踪任务执行状态
  • 同时支持了 <span style={{“color”: “red”}}>Flink 1.12、1.13、1.14 等版本

以上基本解决了我们目前开发和运维中存在的大部分问题。

在目前最新发布的 1.2.0 版本中,StreamPark 较为完善地支持了 K8s-Native-Application 和 K8s-Session-Application 模式。

K8s Native Application 模式

在 StreamPark 中,我们只需要配置相应的参数,并在 Maven POM 中填写相应的依赖,或者上传依赖 jar 包,点击 Apply,相应的依赖就会生成。这就意味着我们也可以将所有使用的 UDF 打成 jar 包,以及各种 connector.jar,直接在 SQL 中使用。如下图:

SQL 校验能力和 Zeppelin 基本一致:

我们也可以指定资源,指定 Flink Run 中的动态参数 Dynamic Option,甚至参数可以整合 Pod Template

程序保存后,点击运行时,也可以指定 savepoint。任务提交成功后,StreamPark 会根据 FlinkPod 网络 Exposed Type(loadBalancer/NodePort/ClusterIp),返回相应的 WebURL,从而自然的实现 WebUI 跳转。但是,目前因为线上私有 K8s 集群出于安全性考虑,尚未打通 Pod 与客户端节点网络(目前也没有这个规划)。所以么,我们只使用 NodePort。如果后续任务数过多,有使用 ClusterIP 的需求的话,我们可能会将 StreamPark 部署在 K8s,或者同 Ingress 做进一步整合。

注意:K8s master 如果使用 vip 做均衡代理的情况下,Flink 1.13 版本会返回 vip 的 ip 地址,在 1.14 版本中已经修复该问题。

下面是 K8s Application 模式下具体提交流程

K8s Native Session 模式

StreamPark 还较好地支持了 <span style={{“color”: “red”}}> K8s Native-Sesson 模式,这为我们后续做离线 FlinkSQL 开发或部分资源隔离做了较好的技术支持。

Native-Session 模式需要事先使用 Flink 命令创建一个运行在 K8s 中的 Flink 集群。如下:

./kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-on-k8s-flinkSql-test \
-Dkubernetes.context=XXX \
-Dkubernetes.namespace=XXXX \
-Dkubernetes.service-account=XXXX \
-Dkubernetes.container.image=XXXX \
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.taskmanager.node-selector=XXXX \
-Dkubernetes.rest-service.exposed.type=Nodeport

如上图,使用该 ClusterId 作为 StreamPark 的任务参数 Kubernetes ClusterId。保存提交任务后,任务会很快处于 Running 状态:

我们顺着 application info 的 WebUI 点击跳转:

可以看到,其实 StreamPark 是将 jar 包通过 REST API 上传到 Flink 集群上,并调度执行任务的。

Custom Code 模式

另我们惊喜的是,StreamPark 还支持代码编写 DataStream/FlinkSQL 任务。对于特殊需求,我们可以自己写 Java/Scala 实现。可以根据 StreamPark 推荐的脚手架方式编写任务,也可以编写一个标准普通的 Flink 任务,通过这种方式我们可以将代码管理交由 git 实现,平台可以用来自动化编译打包与部署。当然,如果能用 SQL 实现的功能,我们会尽量避免自定义 DataStream,减少不必要的运维麻烦。



4. 意见和规划

改进意见

当然 StreamPark 还有很多需要改进的地方,就目前测试来看:

  • 资源管理还有待加强:多文件系统jar包等资源管理功能尚未添加,任务版本功能有待加强。
  • 前端 button 功能还不够丰富:比如任务添加后续可以增加复制等功能按钮。
  • 任务提交日志也需要可视化展示:任务提交伴随着加载 class 文件,打 jar 包,build 镜像,提交镜像,提交任务等过程,每一个环节出错,都会导致任务的失败,但是失败日志往往不明确,或者因为某种原因导致异常未正常抛出,没有转换任务状态,用户会无从下手改进。

众所周知,一个新事物的出现一开始总会不是那么完美。尽管有些许问题和需要改进的 point,但是瑕不掩瑜,我们仍然选择 StreamPark 作为我们的 Flink DevOps,我们也将会和主开发人员一道共同完善 StreamPark,也欢迎更多的人来使用,为 StreamPark 带来更多进步。

未来规划

  • 我们会继续跟进 Doris,并将业务数据 + 日志数据统一入 Doris,通过 Flink 实现湖仓一体;
  • 我们也会逐步将探索 StreamPark 同 DolphinScheduler 2.x 进行整合,完善DolphinScheduler 离线任务,逐步用 Flink 替换掉 Spark,实现真正的流批一体;
  • 基于我们自身在 S3 上的探索积累,fat-jar 包 build 完成之后不再构建镜像,直接利用 Pod Tempelet 挂载 PVC 到 Flink Pod 中的目录,进一步优化代码提交流程;
  • 将 StreamPark 持续应用到我们生产中,并汇同社区开发人员,共同努力,增强 StreamPark 在 Flink 流上的开发部署能力与运行监控能力,努力把 StreamPark 打造成一个功能完善的流数据 DevOps。

附:

StreamPark GitHub:https://github.com/apache/incubator-streampark
Doris GitHub:https://github.com/apache/doris