[FLINK-39275][docs] Fix some issues in the "Performance Tuning" document (#27893)
- Add anchor links to performance tuning sections
- Update configuration and add MultiJoin optimization details
Co-authored-by: klion26 <qcx978132955@gmail.com>
Co-authored-by: Weijie Guo <reswqa@163.com>
Co-authored-by: Yuepeng Pan <panyuepeng@apache.org>
diff --git a/docs/content.zh/docs/dev/table/tuning.md b/docs/content.zh/docs/dev/table/tuning.md
index 6b75716d..73fedea 100644
--- a/docs/content.zh/docs/dev/table/tuning.md
+++ b/docs/content.zh/docs/dev/table/tuning.md
@@ -1,5 +1,4 @@
---
-title: "Performance Tuning"
title: "性能调优"
weight: 11
type: docs
@@ -25,16 +24,19 @@
under the License.
-->
-# Performance Tuning
+<a name="performance-tuning"></a>
+
+# 性能调优
SQL 是数据分析中使用最广泛的语言。Flink Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。此外,Flink Table API 和 SQL 是高效优化过的,它集成了许多查询优化和算子优化。但并不是所有的优化都是默认开启的,因此对于某些工作负载,可以通过打开某些选项来提高性能。
在这一页,我们将介绍一些实用的优化选项以及流式聚合和普通连接的内部原理,它们在某些情况下能带来很大的提升。
{{< hint info >}}
-目前 [分组聚合] ({{< ref "docs/sql/reference/queries/group-agg" >}}) 和 [窗口表值函数聚合]({{< ref "docs/sql/reference/queries/window-agg" >}}) (会话窗口表值函数聚合除外)都支持本页提到的流式聚合优化。
+目前 [分组聚合]({{< ref "docs/sql/reference/queries/group-agg" >}}) 和 [窗口表值函数聚合]({{< ref "docs/sql/reference/queries/window-agg" >}}) (会话窗口表值函数聚合除外)都支持本页提到的流式聚合优化。
{{< /hint >}}
+<a name="minibatch-aggregation"></a>
## MiniBatch 聚合
@@ -49,8 +51,8 @@
默认情况下,对于无界聚合算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 `table.exec.mini-batch.enabled`、`table.exec.mini-batch.allow-latency` 和 `table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。
{{< hint info >}}
-MiniBatch optimization is always enabled for [Window TVF Aggregation]({{< ref "docs/sql/reference/queries/window-agg" >}}), regardless of the above configuration.
-Window TVF aggregation buffer records in [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm">}}#managed-memory) instead of JVM Heap, so there is no risk of overloading GC or OOM issues.
+MiniBatch 优化对于 [Window TVF 聚合]({{< ref "docs/sql/reference/queries/window-agg" >}})始终启用,不受上述配置影响。
+Window TVF 聚合将记录缓冲在[托管内存]({{< ref "docs/deployment/memory/mem_setup_tm">}}#managed-memory)中,而非 JVM 堆内存,因此不存在 GC 过载或 OOM 的风险。
{{< /hint >}}
下面的例子显示如何启用这些选项。
@@ -97,6 +99,8 @@
{{< /tab >}}
{{< /tabs >}}
+<a name="local-global-aggregation"></a>
+
## Local-Global 聚合
Local-Global 聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于 MapReduce 中的 Combine + Reduce 模式。例如,就以下 SQL 而言:
@@ -124,12 +128,12 @@
TableEnvironment tEnv = ...;
// access flink configuration
-Configuration configuration = tEnv.getConfig().getConfiguration();
+TableConfig configuration = tEnv.getConfig();
// set low-level key-value options
-configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
-configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
-configuration.setString("table.exec.mini-batch.size", "5000");
-configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
+configuration.set("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
+configuration.set("table.exec.mini-batch.allow-latency", "5 s");
+configuration.set("table.exec.mini-batch.size", "5000");
+configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
```
{{< /tab >}}
{{< tab "Scala" >}}
@@ -162,6 +166,8 @@
{{< /tab >}}
{{< /tabs >}}
+<a name="split-distinct-aggregation"></a>
+
## 拆分 distinct 聚合
Local-Global 优化可有效消除常规聚合的数据倾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在处理 distinct 聚合时,其性能并不令人满意。
@@ -198,7 +204,9 @@
注意:上面是可以从这个优化中受益的最简单的示例。除此之外,Flink 还支持拆分更复杂的聚合查询,例如,多个具有不同 distinct key (例如 `COUNT(DISTINCT a), SUM(DISTINCT b)` )的 distinct 聚合,可以与其他非 distinct 聚合(例如 `SUM`、`MAX`、`MIN`、`COUNT` )一起使用。
-<span class="label label-danger">注意</span> 当前,拆分优化不支持包含用户定义的 AggregateFunction 聚合。
+{{< hint info >}}
+当前,拆分优化不支持包含用户定义的 AggregateFunction 聚合。
+{{< /hint >}}
下面的例子显示了如何启用拆分 distinct 聚合优化。
@@ -231,6 +239,8 @@
{{< /tab >}}
{{< /tabs >}}
+<a name="use-filter-modifier-on-distinct-aggregates"></a>
+
## 在 distinct 聚合上使用 FILTER 修饰符
在某些情况下,用户可能需要从不同维度计算 UV(独立访客)的数量,例如来自 Android 的 UV、iPhone 的 UV、Web 的 UV 和总 UV。很多人会选择 `CASE WHEN`,例如:
@@ -259,6 +269,8 @@
Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在 `user_id` 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。
+<a name="minibatch-regular-joins"></a>
+
## MiniBatch Regular Joins
默认情况下,regular join 算子是逐条处理输入的记录,即:(1)根据当前输入记录的 join key 关联对方状态中的记录,(2)根据当前记录写入或者撤回状态中的记录,(3)根据当前的输入记录和关联到的记录输出结果。
@@ -288,7 +300,101 @@
默认情况下,对于 regular join 算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 `table.exec.mini-batch.enabled`、`table.exec.mini-batch.allow-latency` 和 `table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。
-{{< top >}}
+<a name="multiple-regular-joins"></a>
+
+## Multiple Regular Joins
+
+{{< label Streaming >}}
+
+在流处理场景中,包含多个非时态 Regular Join 的 Flink 作业,往往因状态过大而频繁出现运行不稳定和性能下降的问题。究其根本,是由于多个 Join 串联后产生的中间状态,往往远超原始输入数据本身的规模。为此,Flink 2.1 引入了全新的 Multi Join 算子,这一优化专为存在记录膨胀和大量中间状态的 Join 流水线而设计,旨在大幅压缩状态规模、提升整体性能。该算子通过同时处理多个输入流来完成跨表 Join,从根本上消除了在多表 Join 链路中存储中间状态的必要性。这种“零中间状态”的处理方式以降低状态规模为核心目标,在某些场景下可显著减少资源消耗、提升运行稳定性。当然,这一方案本质上是一种以算力换存储的权衡取舍,中间状态不再持久化保存,而是在需要时按需重新计算。
+
+在大多数 Join 操作中,相当一部分处理时间都花费在从状态中读取记录上。MultiJoin 算子的效率在很大程度上取决于中间状态的大小以及公共 Join Key 的选择性。在一种常见场景中——即流水线出现记录膨胀(每次 Join 产生的数据量和记录数均多于上一次)时,MultiJoin 算子的优势更为突出。这是因为它使算子所需操作的状态始终保持在较小的规模,从而让算子运行更加稳定。即便某条 Join 链路实际产生的状态比原始记录还要小,MultiJoin 算子整体上仍然占用更少的状态。不过在这种特殊情况下,二元 Join 反而可能表现更好,因为最终几个 Join 所需操作的状态本身已经很小。
+
+<a name="the-multijoin-operator"></a>
+
+### MultiJoin 算子
+
+MultiJoin 算子的主要优势:
+
+1) 状态规模大幅缩减:得益于零中间状态机制,状态占用显著降低。
+2) 链式 Join 性能提升:在存在记录膨胀的场景下,整体处理性能得到明显改善。
+3) 稳定性增强:状态随处理记录数呈线性增长,而非像二元 Join 那样呈多项式级增长。
+
+此外,使用 MultiJoin 替代二元 Join 的流水线通常具有更快地初始化和故障恢复速度,这得益于更小的状态规模以及更少的算子节点。
+
+<a name="when-to-enable-the-multijoin"></a>
+
+### 何时启用 MultiJoin?
+
+如果作业中存在多个 Join 共享至少一个公共 Join Key 的 Join 操作,并且中间 Join 产生的中间状态比原始输入数据源还要大,建议考虑开启 MultiJoin 算子。
+
+推荐使用场景:
+- 公共 Join Key 具有较高选择性(即每个键对应的记录数较少)
+- 包含多个链式 Join 且中间状态很大的 SQL 语句
+- 公共 Join Key 没有明显的数据倾斜
+- Join 操作产生了大量状态(状态规模达 50 GB 及以上)
+
+如果公共 Join Key 的选择性较低(即大量记录共享相同键值),MultiJoin 算子所需的中间状态重新计算将对性能产生严重影响。在此类场景下,建议使用二元 Join,因为二元 Join 会利用所有 Join Key 对数据进行分区,从而避免重算影响性能。
+
+<a name="how-to-enable-the-multijoin"></a>
+
+### 如何启用 MultiJoin?
+
+要对所有符合条件的 Join 全局启用此优化,请设置以下配置:
+
+```sql
+SET 'table.optimizer.multi-join.enabled' = 'true';
+```
+
+或者,你可以使用 `MULTI_JOIN` hint 为特定表启用 MultiJoin 算子:
+
+```sql
+SELECT /*+ MULTI_JOIN(t1, t2, t3) */ * FROM t1
+JOIN t2 ON t1.id = t2.id
+JOIN t3 ON t1.id = t3.id;
+```
+
+Hint 方式允许你有针对性地将 MultiJoin 优化应用于特定查询块,而无需全局启用。有关 `MULTI_JOIN` hint 的更多详情,请参阅 [Join Hints]({{< ref "docs/sql/reference/queries/hints" >}}#multi_join)。需要注意的是,配置项的优先级高于 hint。
+
+重要提示:该功能目前处于实验性阶段,后续可能会进行优化或引入破坏性变更。当前仅支持流式 INNER/LEFT Join。由于记录分区的机制,Join 条件之间至少需要一个公共 Key,详见下方示例:
+
+- 支持:A JOIN B ON A.key = B.key JOIN C ON A.key = C.key(按 key 分区)
+- 支持:A JOIN B ON A.key = B.key JOIN C ON B.key = C.key(通过传递性 key 分区)
+- 不支持:A JOIN B ON A.key1 = B.key1 JOIN C ON B.key2 = C.key2(没有单个 key 可以将 A、B、C 同时分区到单个算子中。这将被拆分为多个 MultiJoin 算子)
+
+<a name="multijoin-operator-example---benchmark"></a>
+
+### MultiJoin 算子示例 - 基准测试
+
+以下是默认二元 Join 与 MultiJoin 算子之间的 10-way 基准测试对比。可以在第一部分观察到中间状态的数量,第二部分观察算子达到 100% 忙碌时处理的记录数,第三部分是 checkpoint。
+
+{{< img src="/fig/table-streaming/multijoin_operator.png" height="100%" >}}
+
+对于上面这个涉及记录放大的 10-way join,可以看到有显著提升。这里有一些大概数据:
+
+- 性能:当两者都处于 100% 忙碌时,处理记录量提升 2 倍到超过 100 倍。
+- 状态大小:中间状态大小缩小 3 倍到超过 1000 倍。
+
+MultiJoin 算子的总状态始终更小。在这种情况下,初始性能相同,但随着中间状态增长,二元 Join 的性能逐渐下降,而 Multi Join 保持稳定并表现更出色。
+
+这个 10-way Join 通用基准测试使用以下配置运行:每个 tenant_id 1 条记录(高选择性),10 个 upsert Kafka topic,并行度 10,每个 topic 每秒 1 条记录。我们使用了基于 RocksDB 的未对齐 checkpoint 和增量 checkpoint。每个作业运行在 8GB 进程内存的 TaskManager 中,1GB 堆外内存和 20% 网络内存。JobManager 有 4GB 进程内存。主机包含 M1 处理器芯片,32GB RAM 和 1TB SSD。sink 使用 blackhole connector,因此我们只对 Join 进行基准测试。用于生成基准测试数据的 SQL 结构如下:
+
+```sql
+INSERT INTO JoinResultsMJ
+SELECT *all fields*
+FROM TenantKafka t
+ LEFT JOIN SuppliersKafka s ON t.tenant_id = s.tenant_id AND ...
+ LEFT JOIN ProductsKafka p ON t.tenant_id = p.tenant_id AND ...
+ LEFT JOIN CategoriesKafka c ON t.tenant_id = c.tenant_id AND ...
+ LEFT JOIN OrdersKafka o ON t.tenant_id = o.tenant_id AND ...
+ LEFT JOIN CustomersKafka cust ON t.tenant_id = cust.tenant_id AND ...
+ LEFT JOIN WarehousesKafka w ON t.tenant_id = w.tenant_id AND ...
+ LEFT JOIN ShippingKafka sh ON t.tenant_id = sh.tenant_id AND ...
+ LEFT JOIN PaymentKafka pay ON t.tenant_id = pay.tenant_id AND ...
+ LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
+```
+
+<a name="delta-joins"></a>
## Delta Joins
@@ -301,6 +407,8 @@
1. 作业拓扑结构满足优化条件。具体可以查看[支持的功能和限制]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations)。
2. 源表所在的外部存储系统提供了可供 delta join 快速查询的索引信息。目前 [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) 已支持在 Flink 中提供表级别的索引信息,其上的表可作为 delta join 的源表。具体可参考 [Fluss 文档](https://fluss.apache.org/docs/engine-flink/delta-joins/#flink-version-support)。
+<a name="working-principle"></a>
+
### 工作原理
在 Flink 中,regular join 将来自两个输入端的所有输入数据存储在状态中,以确保当对侧的数据到达时,能够正确地匹配对应的记录。
@@ -309,6 +417,8 @@
{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
+<a name="important-configurations"></a>
+
### 关键参数
Delta join 优化默认启用。您可以通过设置以下配置手动禁用此功能:
@@ -327,7 +437,7 @@
详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。
-<a name="supported-features-and-limitations" />
+<a name="supported-features-and-limitations"></a>
### 支持的功能和限制
@@ -346,3 +456,5 @@
4. 当消费 **CDC 流**时,**join key** 必须是**主键**的一部分。
5. 当消费 **CDC 流**时,所有 **filter** 必须应用于 **upsert key** 上。
6. 所有 project 和 filter 都不能包含**非确定性函数**。
+
+{{< top >}}