[FLINK-37543][table-planner] Support sink reuse in batch mode (#26379)
diff --git a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
index 2127bb6..de4a77d 100644
--- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
@@ -84,6 +84,12 @@
<td>When true, the optimizer will try to find out duplicated sub-plans by digest to build optimize blocks (a.k.a. common sub-graphs). Each optimize block will be optimized independently.</td>
</tr>
<tr>
+ <td><h5>table.optimizer.reuse-sink-enabled</h5><br> <span class="label label-primary">Batch</span></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>When it is true, the optimizer will try to find out duplicated table sinks and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true.</td>
+ </tr>
+ <tr>
<td><h5>table.optimizer.reuse-source-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index 6f58eda..075188f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -105,6 +105,17 @@
+ TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
+ " is true.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<Boolean> TABLE_OPTIMIZER_REUSE_SINK_ENABLED =
+ key("table.optimizer.reuse-sink-enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When it is true, the optimizer will try to find out duplicated table sinks and "
+ + "reuse them. This works only when "
+ + TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
+ + " is true.");
+
@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED =
key("table.optimizer.source.report-statistics-enabled")
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
new file mode 100644
index 0000000..b5da0be
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This checks if we can find duplicate sinks that can be reused. If so, these duplicate sinks will
+ * be merged into one. Only table sink with the same digest, specs and input trait set can be
+ * reused. This is an optimization so that we do not need to process multiple sinks that are
+ * actually representing the same destination table.
+ *
+ * <p>This optimization is only used in the STATEMENT SET clause with multiple INSERT INTO.
+ *
+ * <p>Examples in SQL look like:
+ *
+ * <pre>{@code
+ * BEGIN STATEMENT SET;
+ * INSERT INTO sink1 SELECT * FROM source1;
+ * INSERT INTO sink1 SELECT * FROM source2;
+ * INSERT INTO sink2 SELECT * FROM source3;
+ * END;
+ * }</pre>
+ *
+ * <p>The plan is as follows:
+ *
+ * <pre>{@code
+ * TableScan1 —— Sink1
+ * TableScan2 —— Sink1
+ * TableScan3 —— Sink2
+ * }</pre>
+ *
+ * <p>After reused, the plan will be changed as follows:
+ *
+ * <pre>{@code
+ * TableScan1 --\
+ * Union -- Sink1
+ * TableScan2 --/
+ *
+ * TableScan3 —— Sink2
+ * }</pre>
+ */
+public class SinkReuser {
+
+ public List<RelNode> reuseDuplicatedSink(List<RelNode> relNodes) {
+ // Find all sinks
+ List<Sink> allSinkNodes =
+ relNodes.stream()
+ .filter(node -> node instanceof Sink)
+ .map(node -> (Sink) node)
+ .collect(Collectors.toList());
+ List<ReusableSinkGroup> reusableSinkGroups = groupReusableSink(allSinkNodes);
+
+ Set<Sink> reusedSinkNodes = reuseSinkAndAddUnion(reusableSinkGroups);
+
+ // Remove all unused sink nodes
+ return relNodes.stream()
+ .filter(root -> !(root instanceof Sink) || reusedSinkNodes.contains(root))
+ .collect(Collectors.toList());
+ }
+
+ private Set<Sink> reuseSinkAndAddUnion(List<ReusableSinkGroup> reusableSinkGroups) {
+ final Set<Sink> reusedSinkNodes = Collections.newSetFromMap(new IdentityHashMap<>());
+ reusableSinkGroups.forEach(
+ group -> {
+ List<Sink> originalSinks = group.originalSinks;
+ if (originalSinks.size() <= 1) {
+ Preconditions.checkState(originalSinks.size() == 1);
+ reusedSinkNodes.add(originalSinks.get(0));
+ return;
+ }
+ List<RelNode> allSinkInputs = new ArrayList<>();
+ for (Sink sinkNode : originalSinks) {
+ allSinkInputs.add(sinkNode.getInput());
+ }
+
+ // Use the first sink node as the final reused sink node
+ Sink reusedSink = originalSinks.get(0);
+
+ Union unionForReusedSinks;
+
+ unionForReusedSinks =
+ new BatchPhysicalUnion(
+ reusedSink.getCluster(),
+ group.inputTraitSet,
+ allSinkInputs,
+ true,
+ // use sink input row type
+ reusedSink.getRowType());
+
+ reusedSink.replaceInput(0, unionForReusedSinks);
+ reusedSinkNodes.add(reusedSink);
+ });
+ return reusedSinkNodes;
+ }
+
+ /**
+ * Grouping sinks that can be reused with each other.
+ *
+ * @param allSinkNodes in the plan.
+ * @return a list contains all grouped sink.
+ */
+ private List<ReusableSinkGroup> groupReusableSink(List<Sink> allSinkNodes) {
+ List<ReusableSinkGroup> reusableSinkGroups = new ArrayList<>();
+
+ for (Sink currentSinkNode : allSinkNodes) {
+ Optional<ReusableSinkGroup> targetGroup =
+ reusableSinkGroups.stream()
+ .filter(
+ reusableSinkGroup ->
+ reusableSinkGroup.canBeReused(currentSinkNode))
+ .findFirst();
+
+ if (targetGroup.isPresent()) {
+ targetGroup.get().originalSinks.add(currentSinkNode);
+ } else {
+ // If the current sink cannot be reused with any existing groups, create a new
+ // group.
+ reusableSinkGroups.add(new ReusableSinkGroup(currentSinkNode));
+ }
+ }
+ return reusableSinkGroups;
+ }
+
+ private String getDigest(Sink sink) {
+ List<String> digest = new ArrayList<>();
+ digest.add(sink.contextResolvedTable().getIdentifier().asSummaryString());
+
+ int[][] targetColumns = sink.targetColumns();
+ if (targetColumns != null && targetColumns.length > 0) {
+ digest.add(
+ "targetColumns=["
+ + Arrays.stream(targetColumns)
+ .map(Arrays::toString)
+ .collect(Collectors.joining(","))
+ + "]");
+ }
+
+ String fieldTypes =
+ sink.getRowType().getFieldList().stream()
+ .map(f -> f.getType().toString())
+ .collect(Collectors.joining(", "));
+ digest.add("fieldTypes=[" + fieldTypes + "]");
+ if (!sink.hints().isEmpty()) {
+ digest.add("hints=" + RelExplainUtil.hintsToString(sink.hints()));
+ }
+
+ return digest.toString();
+ }
+
+ private class ReusableSinkGroup {
+ private final List<Sink> originalSinks = new ArrayList<>();
+
+ private final SinkAbilitySpec[] sinkAbilitySpecs;
+
+ private final RelTraitSet inputTraitSet;
+
+ private final String digest;
+
+ ReusableSinkGroup(Sink sink) {
+ this.originalSinks.add(sink);
+ this.sinkAbilitySpecs = ((BatchPhysicalSink) sink).abilitySpecs();
+ this.inputTraitSet = sink.getInput().getTraitSet();
+ this.digest = getDigest(sink);
+ }
+
+ public boolean canBeReused(Sink sinkNode) {
+ String currentSinkDigest = getDigest(sinkNode);
+ SinkAbilitySpec[] currentSinkSpecs = ((BatchPhysicalSink) sinkNode).abilitySpecs();
+ RelTraitSet currentInputTraitSet = sinkNode.getInput().getTraitSet();
+
+ // Only table sink with the same digest, specs and input trait set can be reused
+ return this.digest.equals(currentSinkDigest)
+ && Arrays.equals(this.sinkAbilitySpecs, currentSinkSpecs)
+ && this.inputTraitSet.equals(currentInputTraitSet);
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 03975fe6..da4f0b2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -393,7 +393,7 @@
@VisibleForTesting
private[flink] def optimize(relNodes: Seq[RelNode]): Seq[RelNode] = {
val optimizedRelNodes = getOptimizer.optimize(relNodes)
- require(optimizedRelNodes.size == relNodes.size)
+ require(optimizedRelNodes.size <= relNodes.size)
optimizedRelNodes
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
index 8080ec1..9645c15 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
@@ -42,7 +42,7 @@
contextResolvedTable: ContextResolvedTable,
tableSink: DynamicTableSink,
targetColumns: Array[Array[Int]],
- abilitySpecs: Array[SinkAbilitySpec])
+ val abilitySpecs: Array[SinkAbilitySpec])
extends Sink(cluster, traitSet, inputRel, hints, targetColumns, contextResolvedTable, tableSink)
with BatchPhysicalRel {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
index fd607d9..e0e4848 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
@@ -63,11 +63,18 @@
val tableSourceReuseEnabled =
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED)
+ val tableSinkReuseEnabled =
+ tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED)
+
var newRels = rels
if (tableSourceReuseEnabled) {
newRels = new ScanReuser(flinkContext, flinkTypeFactory).reuseDuplicatedScan(rels)
}
+ if (tableSinkReuseEnabled && flinkContext.isBatchMode) {
+ newRels = new SinkReuser().reuseDuplicatedSink(newRels)
+ }
+
val context = new SubplanReuseContext(tableSourceReuseEnabled, newRels: _*)
val reuseShuttle = new SubplanReuseShuttle(context)
newRels.map(_.accept(reuseShuttle))
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.java
new file mode 100644
index 0000000..643cd79
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.common.SinkReuseTestBase;
+import org.apache.flink.table.planner.plan.reuse.SinkReuser;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+/** Tests for {@link SinkReuser} in batch mode. */
+public class BatchSinkReuseTest extends SinkReuseTestBase {
+ @Override
+ protected TableTestUtil getTableTestUtil(TableConfig tableConfig) {
+ return batchTestUtil(tableConfig);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java
new file mode 100644
index 0000000..912e0bc
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common;
+
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.reuse.SinkReuser;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Base test class for {@link SinkReuser}. */
+public abstract class SinkReuseTestBase extends TableTestBase {
+ protected TableTestUtil util;
+
+ @BeforeEach
+ void setup() {
+ TableConfig tableConfig = TableConfig.getDefault();
+ tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true);
+ tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED, true);
+ util = getTableTestUtil(tableConfig);
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE sink1 (\n"
+ + " x BIGINT,\n"
+ + " y BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'filesystem',\n"
+ + " 'format' = 'test-format',\n"
+ + " 'test-format.delimiter' = ',',\n"
+ + "'path' = 'ignore'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE sink2 (\n"
+ + " x BIGINT,\n"
+ + " y BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE partitionedSink (\n"
+ + " a BIGINT,\n"
+ + " b BIGINT\n"
+ + ") PARTITIONED BY(`a`) WITH (\n"
+ + " 'connector' = 'values'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE source1 (\n"
+ + " x BIGINT,\n"
+ + " y BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE source2 (\n"
+ + " x BIGINT,\n"
+ + " y BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE source3 (\n"
+ + " x BIGINT,\n"
+ + " y BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE source4 (\n"
+ + " x BIGINT,\n"
+ + " y BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE filed_name_change_source (\n"
+ + " x1 BIGINT,\n"
+ + " y1 BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE type_coercion_source (\n"
+ + " x INT,\n"
+ + " y INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+ }
+
+ @Test
+ public void testSinkReuse() {
+ StatementSet statementSet = util.tableEnv().createStatementSet();
+ statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source1)");
+ statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source2)");
+ statementSet.addInsertSql("INSERT INTO sink2 (SELECT * FROM source3)");
+ statementSet.addInsertSql("INSERT INTO sink2 (SELECT * FROM source4)");
+ util.verifyExecPlan(statementSet);
+ }
+
+ @Test
+ public void testSinkReuseFromSameSource() {
+ StatementSet statementSet = util.tableEnv().createStatementSet();
+ statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source1)");
+ statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source1)");
+ util.verifyExecPlan(statementSet);
+ }
+
+ @Test
+ public void testSinkReuseWithPartialColumns() {
+ StatementSet statementSet = util.tableEnv().createStatementSet();
+ statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source1)");
+ statementSet.addInsertSql("INSERT INTO sink1(`y`) (SELECT y FROM source1)");
+ statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source3)");
+ util.verifyExecPlan(statementSet);
+ }
+
+ @Test
+ public void testSinkReuseWithOverwrite() {
+ StatementSet statementSet = util.tableEnv().createStatementSet();
+ statementSet.addInsertSql("INSERT OVERWRITE sink1 (SELECT * FROM source1)");
+ statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source2)");
+ statementSet.addInsertSql("INSERT OVERWRITE sink1 (SELECT * FROM source3)");
+ util.verifyExecPlan(statementSet);
+ }
+
+ @Test
+ public void testSinkReuseWithPartition() {
+ StatementSet statementSet = util.tableEnv().createStatementSet();
+ statementSet.addInsertSql(
+ "INSERT INTO partitionedSink PARTITION(a = 1) (SELECT y FROM source1)");
+ statementSet.addInsertSql(
+ "INSERT INTO partitionedSink PARTITION(a = 1) (SELECT y FROM source2)");
+ statementSet.addInsertSql(
+ "INSERT INTO partitionedSink PARTITION(a = 2) (SELECT y FROM source3)");
+ statementSet.addInsertSql("INSERT INTO partitionedSink (SELECT * FROM source4)");
+ util.verifyExecPlan(statementSet);
+ }
+
+ @Test
+ public void testSinkReuseWithDifferentFieldNames() {
+ StatementSet statementSet = util.tableEnv().createStatementSet();
+ statementSet.addInsertSql("INSERT INTO sink1 (SELECT x, y FROM source1)");
+ statementSet.addInsertSql(
+ "INSERT INTO sink1 (SELECT x1, y1 FROM filed_name_change_source)");
+ util.verifyExecPlan(statementSet);
+ }
+
+ @Test
+ public void testSinkReuseWithHint() {
+ StatementSet statementSet = util.tableEnv().createStatementSet();
+ statementSet.addInsertSql(
+ "INSERT INTO sink1 /*+ OPTIONS('path' = 'ignore1') */ (SELECT * FROM source1)");
+ statementSet.addInsertSql(
+ "INSERT INTO sink1 /*+ OPTIONS('path' = 'ignore2') */ (SELECT * FROM source2)");
+ statementSet.addInsertSql(
+ "INSERT INTO sink1 /*+ OPTIONS('path' = 'ignore1') */ (SELECT * FROM source3)");
+ util.verifyExecPlan(statementSet);
+ }
+
+ @Test
+ public void testSinkReuseWithTypeCoercionSource() {
+ StatementSet statementSet = util.tableEnv().createStatementSet();
+ statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source1)");
+ statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM type_coercion_source)");
+ util.verifyExecPlan(statementSet);
+ }
+
+ protected abstract TableTestUtil getTableTestUtil(TableConfig tableConfig);
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
new file mode 100644
index 0000000..900bfec
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.common.sql;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for merging table sink. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class SinkReuseITCase extends AbstractTestBase {
+ @Parameter public Boolean isBatch;
+
+ @Parameters(name = "isBatch: {0}")
+ public static Collection<Boolean> parameters() {
+ return List.of(true);
+ }
+
+ TableEnvironment tEnv;
+
+ void setup(boolean isBatch) throws Exception {
+ EnvironmentSettings settings;
+ if (isBatch) {
+ settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ tEnv = TableEnvironmentImpl.create(settings);
+ } else {
+ settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ env.getConfig().enableObjectReuse();
+ tEnv = StreamTableEnvironment.create(env, settings);
+ }
+
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED, true);
+
+ String dataId1 =
+ TestValuesTableFactory.registerData(
+ Arrays.asList(Row.of(1, 1.1d, "Tom"), Row.of(2, 1.2d, "Jerry")));
+
+ String dataId2 =
+ TestValuesTableFactory.registerData(
+ Arrays.asList(Row.of(1, 2.1d, "Alice"), Row.of(2, 2.2d, "Bob")));
+
+ String dataId3 =
+ TestValuesTableFactory.registerData(
+ Arrays.asList(Row.of(1, 3.1d, "Jack"), Row.of(2, 3.2d, "Rose")));
+
+ createSourceTable("source1", getSourceOptions(dataId1));
+ createSourceTable("source2", getSourceOptions(dataId2));
+ createSourceTable("source3", getSourceOptions(dataId3));
+
+ createSinkTable("sink1", getSinkOptions());
+ createSinkTable("sink2", getSinkOptions());
+ }
+
+ @TestTemplate
+ public void testSinkMergeFromSameSource() throws Exception {
+ setup(isBatch);
+ StatementSet statementSet = tEnv.createStatementSet();
+ statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
+ statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
+ statementSet.execute().await();
+
+ List<String> sink1Result = TestValuesTableFactory.getResultsAsStrings("sink1");
+ List<String> sink1Expected =
+ Arrays.asList(
+ "+I[1, 1.1, Tom]",
+ "+I[2, 1.2, Jerry]",
+ "+I[1, 1.1, Tom]",
+ "+I[2, 1.2, Jerry]");
+ assertResult(sink1Expected, sink1Result);
+ }
+
+ @TestTemplate
+ public void testMergeSink() throws Exception {
+ setup(isBatch);
+ StatementSet statementSet = tEnv.createStatementSet();
+ statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
+ statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source2");
+ statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source3");
+ statementSet.execute().await();
+
+ List<String> sink1Result = TestValuesTableFactory.getResultsAsStrings("sink1");
+ List<String> sink2Result = TestValuesTableFactory.getResultsAsStrings("sink2");
+
+ List<String> sink1Expected =
+ Arrays.asList(
+ "+I[1, 1.1, Tom]",
+ "+I[2, 1.2, Jerry]",
+ "+I[1, 2.1, Alice]",
+ "+I[2, 2.2, Bob]");
+ List<String> sink3Expected = Arrays.asList("+I[1, 3.1, Jack]", "+I[2, 3.2, Rose]");
+
+ assertResult(sink1Expected, sink1Result);
+ assertResult(sink3Expected, sink2Result);
+ }
+
+ private Map<String, String> getSourceOptions(String dataId) {
+ Map<String, String> sourceOptions = new HashMap<>();
+ sourceOptions.put("connector", "values");
+ sourceOptions.put("bounded", "true");
+ sourceOptions.put("data-id", dataId);
+ return sourceOptions;
+ }
+
+ private Map<String, String> getSinkOptions() {
+ Map<String, String> sinkOptions = new HashMap<>();
+ sinkOptions.put("connector", "values");
+ sinkOptions.put("bounded", "true");
+ return sinkOptions;
+ }
+
+ private void createSinkTable(String tableName, Map<String, String> options) {
+ String ddl =
+ String.format(
+ "CREATE TABLE `%s` (\n"
+ + " a int,\n"
+ + " b double,\n"
+ + " c string\n"
+ + ") WITH (\n"
+ + " %s \n"
+ + ")",
+ tableName, makeWithOptions(options));
+ tEnv.executeSql(ddl);
+ }
+
+ private void createSourceTable(String tableName, Map<String, String> options) {
+ String ddl =
+ String.format(
+ "CREATE TABLE `%s` (\n"
+ + " a int,\n"
+ + " b double,\n"
+ + " c string,\n"
+ + " PRIMARY KEY (a) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " %s \n"
+ + ")",
+ tableName, makeWithOptions(options));
+ tEnv.executeSql(ddl);
+ }
+
+ private String makeWithOptions(Map<String, String> options) {
+ return options.keySet().stream()
+ .map(key -> String.format(" '%s' = '%s'", key, options.get(key)))
+ .collect(Collectors.joining(",\n"));
+ }
+
+ private void assertResult(List<String> expected, List<String> actual) {
+ Collections.sort(expected);
+ Collections.sort(actual);
+ assertThat(actual).isEqualTo(expected);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml
new file mode 100644
index 0000000..f3c3392
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml
@@ -0,0 +1,245 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testSinkReuse">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.sink2], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+
+LogicalSink(table=[default_catalog.default_database.sink2], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source4]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+ :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+ +- TableSourceScan(table=[[default_catalog, default_database, source2]], fields=[x, y])
+
+Sink(table=[default_catalog.default_database.sink2], fields=[x, y])
++- Union(all=[true], union=[x, y])
+ :- TableSourceScan(table=[[default_catalog, default_database, source3]], fields=[x, y])
+ +- TableSourceScan(table=[[default_catalog, default_database, source4]], fields=[x, y])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSinkReuseFromSameSource">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+ :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])(reuse_id=[1])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSinkReuseWithDifferentFieldNames">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x1, y1])
++- LogicalProject(x1=[$0], y1=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, filed_name_change_source]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+ :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+ +- TableSourceScan(table=[[default_catalog, default_database, filed_name_change_source]], fields=[x1, y1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSinkReuseWithOverwrite">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+ :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+ +- TableSourceScan(table=[[default_catalog, default_database, source3]], fields=[x, y])
+
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- TableSourceScan(table=[[default_catalog, default_database, source2]], fields=[x, y])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSinkReuseWithHint">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore1}]]])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore2}]]])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore1}]]])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore1}]]])
++- Union(all=[true], union=[x, y])
+ :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+ +- TableSourceScan(table=[[default_catalog, default_database, source3]], fields=[x, y])
+
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore2}]]])
++- TableSourceScan(table=[[default_catalog, default_database, source2]], fields=[x, y])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSinkReuseWithPartialColumns">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
++- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
++- LogicalProject(EXPR$0=[null:BIGINT], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
++- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
+
+Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
++- Union(all=[true], union=[x, EXPR$1])
+ :- Calc(select=[x, null:BIGINT AS EXPR$1])
+ : +- Reused(reference_id=[1])
+ +- Calc(select=[x, null:BIGINT AS EXPR$1])
+ +- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
+
+Sink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
++- Calc(select=[null:BIGINT AS EXPR$0, y])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSinkReuseWithTypeCoercionSource">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[CAST($0):BIGINT], y=[CAST($1):BIGINT])
+ +- LogicalTableScan(table=[[default_catalog, default_database, type_coercion_source]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+ :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+ +- Calc(select=[CAST(x AS BIGINT) AS x, CAST(y AS BIGINT) AS y])
+ +- TableSourceScan(table=[[default_catalog, default_database, type_coercion_source]], fields=[x, y])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSinkReuseWithPartition">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- LogicalProject(EXPR$0=[1:BIGINT], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- LogicalProject(EXPR$0=[1:BIGINT], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- LogicalProject(EXPR$0=[2:BIGINT], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+
+LogicalSink(table=[default_catalog.default_database.partitionedSink], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source4]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- Union(all=[true], union=[EXPR$0, y])
+ :- Calc(select=[1 AS EXPR$0, y])
+ : +- TableSourceScan(table=[[default_catalog, default_database, source1, project=[y], metadata=[]]], fields=[y])
+ +- Calc(select=[1 AS EXPR$0, y])
+ +- TableSourceScan(table=[[default_catalog, default_database, source2, project=[y], metadata=[]]], fields=[y])
+
+Sink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- Calc(select=[2 AS EXPR$0, y])
+ +- TableSourceScan(table=[[default_catalog, default_database, source3, project=[y], metadata=[]]], fields=[y])
+
+Sink(table=[default_catalog.default_database.partitionedSink], fields=[x, y])
++- Sort(orderBy=[x ASC])
+ +- TableSourceScan(table=[[default_catalog, default_database, source4]], fields=[x, y])
+]]>
+ </Resource>
+ </TestCase>
+</Root>