[FLINK-37543][table-planner] Support sink reuse in batch mode (#26379)

diff --git a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
index 2127bb6..de4a77d 100644
--- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
@@ -84,6 +84,12 @@
             <td>When true, the optimizer will try to find out duplicated sub-plans by digest to build optimize blocks (a.k.a. common sub-graphs). Each optimize block will be optimized independently.</td>
         </tr>
         <tr>
+            <td><h5>table.optimizer.reuse-sink-enabled</h5><br> <span class="label label-primary">Batch</span></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>When it is true, the optimizer will try to find out duplicated table sinks and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true.</td>
+        </tr>
+        <tr>
             <td><h5>table.optimizer.reuse-source-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index 6f58eda..075188f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -105,6 +105,17 @@
                                     + TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
                                     + " is true.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<Boolean> TABLE_OPTIMIZER_REUSE_SINK_ENABLED =
+            key("table.optimizer.reuse-sink-enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When it is true, the optimizer will try to find out duplicated table sinks and "
+                                    + "reuse them. This works only when "
+                                    + TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
+                                    + " is true.");
+
     @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
     public static final ConfigOption<Boolean> TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED =
             key("table.optimizer.source.report-statistics-enabled")
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
new file mode 100644
index 0000000..b5da0be
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This checks if we can find duplicate sinks that can be reused. If so, these duplicate sinks will
+ * be merged into one. Only table sink with the same digest, specs and input trait set can be
+ * reused. This is an optimization so that we do not need to process multiple sinks that are
+ * actually representing the same destination table.
+ *
+ * <p>This optimization is only used in the STATEMENT SET clause with multiple INSERT INTO.
+ *
+ * <p>Examples in SQL look like:
+ *
+ * <pre>{@code
+ * BEGIN STATEMENT SET;
+ * INSERT INTO sink1 SELECT * FROM source1;
+ * INSERT INTO sink1 SELECT * FROM source2;
+ * INSERT INTO sink2 SELECT * FROM source3;
+ * END;
+ * }</pre>
+ *
+ * <p>The plan is as follows:
+ *
+ * <pre>{@code
+ * TableScan1 —— Sink1
+ * TableScan2 —— Sink1
+ * TableScan3 —— Sink2
+ * }</pre>
+ *
+ * <p>After reused, the plan will be changed as follows:
+ *
+ * <pre>{@code
+ * TableScan1 --\
+ *               Union -- Sink1
+ * TableScan2 --/
+ *
+ * TableScan3 —— Sink2
+ * }</pre>
+ */
+public class SinkReuser {
+
+    public List<RelNode> reuseDuplicatedSink(List<RelNode> relNodes) {
+        // Find all sinks
+        List<Sink> allSinkNodes =
+                relNodes.stream()
+                        .filter(node -> node instanceof Sink)
+                        .map(node -> (Sink) node)
+                        .collect(Collectors.toList());
+        List<ReusableSinkGroup> reusableSinkGroups = groupReusableSink(allSinkNodes);
+
+        Set<Sink> reusedSinkNodes = reuseSinkAndAddUnion(reusableSinkGroups);
+
+        // Remove all unused sink nodes
+        return relNodes.stream()
+                .filter(root -> !(root instanceof Sink) || reusedSinkNodes.contains(root))
+                .collect(Collectors.toList());
+    }
+
+    private Set<Sink> reuseSinkAndAddUnion(List<ReusableSinkGroup> reusableSinkGroups) {
+        final Set<Sink> reusedSinkNodes = Collections.newSetFromMap(new IdentityHashMap<>());
+        reusableSinkGroups.forEach(
+                group -> {
+                    List<Sink> originalSinks = group.originalSinks;
+                    if (originalSinks.size() <= 1) {
+                        Preconditions.checkState(originalSinks.size() == 1);
+                        reusedSinkNodes.add(originalSinks.get(0));
+                        return;
+                    }
+                    List<RelNode> allSinkInputs = new ArrayList<>();
+                    for (Sink sinkNode : originalSinks) {
+                        allSinkInputs.add(sinkNode.getInput());
+                    }
+
+                    // Use the first sink node as the final reused sink node
+                    Sink reusedSink = originalSinks.get(0);
+
+                    Union unionForReusedSinks;
+
+                    unionForReusedSinks =
+                            new BatchPhysicalUnion(
+                                    reusedSink.getCluster(),
+                                    group.inputTraitSet,
+                                    allSinkInputs,
+                                    true,
+                                    // use sink input row type
+                                    reusedSink.getRowType());
+
+                    reusedSink.replaceInput(0, unionForReusedSinks);
+                    reusedSinkNodes.add(reusedSink);
+                });
+        return reusedSinkNodes;
+    }
+
+    /**
+     * Grouping sinks that can be reused with each other.
+     *
+     * @param allSinkNodes in the plan.
+     * @return a list contains all grouped sink.
+     */
+    private List<ReusableSinkGroup> groupReusableSink(List<Sink> allSinkNodes) {
+        List<ReusableSinkGroup> reusableSinkGroups = new ArrayList<>();
+
+        for (Sink currentSinkNode : allSinkNodes) {
+            Optional<ReusableSinkGroup> targetGroup =
+                    reusableSinkGroups.stream()
+                            .filter(
+                                    reusableSinkGroup ->
+                                            reusableSinkGroup.canBeReused(currentSinkNode))
+                            .findFirst();
+
+            if (targetGroup.isPresent()) {
+                targetGroup.get().originalSinks.add(currentSinkNode);
+            } else {
+                // If the current sink cannot be reused with any existing groups, create a new
+                // group.
+                reusableSinkGroups.add(new ReusableSinkGroup(currentSinkNode));
+            }
+        }
+        return reusableSinkGroups;
+    }
+
+    private String getDigest(Sink sink) {
+        List<String> digest = new ArrayList<>();
+        digest.add(sink.contextResolvedTable().getIdentifier().asSummaryString());
+
+        int[][] targetColumns = sink.targetColumns();
+        if (targetColumns != null && targetColumns.length > 0) {
+            digest.add(
+                    "targetColumns=["
+                            + Arrays.stream(targetColumns)
+                                    .map(Arrays::toString)
+                                    .collect(Collectors.joining(","))
+                            + "]");
+        }
+
+        String fieldTypes =
+                sink.getRowType().getFieldList().stream()
+                        .map(f -> f.getType().toString())
+                        .collect(Collectors.joining(", "));
+        digest.add("fieldTypes=[" + fieldTypes + "]");
+        if (!sink.hints().isEmpty()) {
+            digest.add("hints=" + RelExplainUtil.hintsToString(sink.hints()));
+        }
+
+        return digest.toString();
+    }
+
+    private class ReusableSinkGroup {
+        private final List<Sink> originalSinks = new ArrayList<>();
+
+        private final SinkAbilitySpec[] sinkAbilitySpecs;
+
+        private final RelTraitSet inputTraitSet;
+
+        private final String digest;
+
+        ReusableSinkGroup(Sink sink) {
+            this.originalSinks.add(sink);
+            this.sinkAbilitySpecs = ((BatchPhysicalSink) sink).abilitySpecs();
+            this.inputTraitSet = sink.getInput().getTraitSet();
+            this.digest = getDigest(sink);
+        }
+
+        public boolean canBeReused(Sink sinkNode) {
+            String currentSinkDigest = getDigest(sinkNode);
+            SinkAbilitySpec[] currentSinkSpecs = ((BatchPhysicalSink) sinkNode).abilitySpecs();
+            RelTraitSet currentInputTraitSet = sinkNode.getInput().getTraitSet();
+
+            // Only table sink with the same digest, specs and input trait set can be reused
+            return this.digest.equals(currentSinkDigest)
+                    && Arrays.equals(this.sinkAbilitySpecs, currentSinkSpecs)
+                    && this.inputTraitSet.equals(currentInputTraitSet);
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 03975fe6..da4f0b2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -393,7 +393,7 @@
   @VisibleForTesting
   private[flink] def optimize(relNodes: Seq[RelNode]): Seq[RelNode] = {
     val optimizedRelNodes = getOptimizer.optimize(relNodes)
-    require(optimizedRelNodes.size == relNodes.size)
+    require(optimizedRelNodes.size <= relNodes.size)
     optimizedRelNodes
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
index 8080ec1..9645c15 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
@@ -42,7 +42,7 @@
     contextResolvedTable: ContextResolvedTable,
     tableSink: DynamicTableSink,
     targetColumns: Array[Array[Int]],
-    abilitySpecs: Array[SinkAbilitySpec])
+    val abilitySpecs: Array[SinkAbilitySpec])
   extends Sink(cluster, traitSet, inputRel, hints, targetColumns, contextResolvedTable, tableSink)
   with BatchPhysicalRel {
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
index fd607d9..e0e4848 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
@@ -63,11 +63,18 @@
     val tableSourceReuseEnabled =
       tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED)
 
+    val tableSinkReuseEnabled =
+      tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED)
+
     var newRels = rels
     if (tableSourceReuseEnabled) {
       newRels = new ScanReuser(flinkContext, flinkTypeFactory).reuseDuplicatedScan(rels)
     }
 
+    if (tableSinkReuseEnabled && flinkContext.isBatchMode) {
+      newRels = new SinkReuser().reuseDuplicatedSink(newRels)
+    }
+
     val context = new SubplanReuseContext(tableSourceReuseEnabled, newRels: _*)
     val reuseShuttle = new SubplanReuseShuttle(context)
     newRels.map(_.accept(reuseShuttle))
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.java
new file mode 100644
index 0000000..643cd79
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.common.SinkReuseTestBase;
+import org.apache.flink.table.planner.plan.reuse.SinkReuser;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+/** Tests for {@link SinkReuser} in batch mode. */
+public class BatchSinkReuseTest extends SinkReuseTestBase {
+    @Override
+    protected TableTestUtil getTableTestUtil(TableConfig tableConfig) {
+        return batchTestUtil(tableConfig);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java
new file mode 100644
index 0000000..912e0bc
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common;
+
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.reuse.SinkReuser;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Base test class for {@link SinkReuser}. */
+public abstract class SinkReuseTestBase extends TableTestBase {
+    protected TableTestUtil util;
+
+    @BeforeEach
+    void setup() {
+        TableConfig tableConfig = TableConfig.getDefault();
+        tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true);
+        tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED, true);
+        util = getTableTestUtil(tableConfig);
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE sink1 (\n"
+                                + "  x BIGINT,\n"
+                                + "  y BIGINT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'filesystem',\n"
+                                + " 'format' = 'test-format',\n"
+                                + " 'test-format.delimiter' = ',',\n"
+                                + "'path' = 'ignore'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE sink2 (\n"
+                                + "  x BIGINT,\n"
+                                + "  y BIGINT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE partitionedSink (\n"
+                                + "  a BIGINT,\n"
+                                + "  b BIGINT\n"
+                                + ")  PARTITIONED BY(`a`) WITH (\n"
+                                + " 'connector' = 'values'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE source1 (\n"
+                                + "  x BIGINT,\n"
+                                + "  y BIGINT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE source2 (\n"
+                                + "  x BIGINT,\n"
+                                + "  y BIGINT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE source3 (\n"
+                                + "  x BIGINT,\n"
+                                + "  y BIGINT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE source4 (\n"
+                                + "  x BIGINT,\n"
+                                + "  y BIGINT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE filed_name_change_source (\n"
+                                + "  x1 BIGINT,\n"
+                                + "  y1 BIGINT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE type_coercion_source (\n"
+                                + "  x INT,\n"
+                                + "  y INT\n"
+                                + ")  WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'disable-lookup' = 'true',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+    }
+
+    @Test
+    public void testSinkReuse() {
+        StatementSet statementSet = util.tableEnv().createStatementSet();
+        statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source1)");
+        statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source2)");
+        statementSet.addInsertSql("INSERT INTO sink2 (SELECT * FROM source3)");
+        statementSet.addInsertSql("INSERT INTO sink2 (SELECT * FROM source4)");
+        util.verifyExecPlan(statementSet);
+    }
+
+    @Test
+    public void testSinkReuseFromSameSource() {
+        StatementSet statementSet = util.tableEnv().createStatementSet();
+        statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source1)");
+        statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source1)");
+        util.verifyExecPlan(statementSet);
+    }
+
+    @Test
+    public void testSinkReuseWithPartialColumns() {
+        StatementSet statementSet = util.tableEnv().createStatementSet();
+        statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source1)");
+        statementSet.addInsertSql("INSERT INTO sink1(`y`) (SELECT y FROM source1)");
+        statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source3)");
+        util.verifyExecPlan(statementSet);
+    }
+
+    @Test
+    public void testSinkReuseWithOverwrite() {
+        StatementSet statementSet = util.tableEnv().createStatementSet();
+        statementSet.addInsertSql("INSERT OVERWRITE sink1 (SELECT * FROM source1)");
+        statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source2)");
+        statementSet.addInsertSql("INSERT OVERWRITE sink1 (SELECT * FROM source3)");
+        util.verifyExecPlan(statementSet);
+    }
+
+    @Test
+    public void testSinkReuseWithPartition() {
+        StatementSet statementSet = util.tableEnv().createStatementSet();
+        statementSet.addInsertSql(
+                "INSERT INTO partitionedSink PARTITION(a = 1) (SELECT y FROM source1)");
+        statementSet.addInsertSql(
+                "INSERT INTO partitionedSink PARTITION(a = 1) (SELECT y FROM source2)");
+        statementSet.addInsertSql(
+                "INSERT INTO partitionedSink PARTITION(a = 2) (SELECT y FROM source3)");
+        statementSet.addInsertSql("INSERT INTO partitionedSink (SELECT * FROM source4)");
+        util.verifyExecPlan(statementSet);
+    }
+
+    @Test
+    public void testSinkReuseWithDifferentFieldNames() {
+        StatementSet statementSet = util.tableEnv().createStatementSet();
+        statementSet.addInsertSql("INSERT INTO sink1 (SELECT x, y FROM source1)");
+        statementSet.addInsertSql(
+                "INSERT INTO sink1 (SELECT x1, y1 FROM filed_name_change_source)");
+        util.verifyExecPlan(statementSet);
+    }
+
+    @Test
+    public void testSinkReuseWithHint() {
+        StatementSet statementSet = util.tableEnv().createStatementSet();
+        statementSet.addInsertSql(
+                "INSERT INTO sink1 /*+ OPTIONS('path' = 'ignore1') */ (SELECT * FROM source1)");
+        statementSet.addInsertSql(
+                "INSERT INTO sink1 /*+ OPTIONS('path' = 'ignore2') */ (SELECT * FROM source2)");
+        statementSet.addInsertSql(
+                "INSERT INTO sink1 /*+ OPTIONS('path' = 'ignore1') */ (SELECT * FROM source3)");
+        util.verifyExecPlan(statementSet);
+    }
+
+    @Test
+    public void testSinkReuseWithTypeCoercionSource() {
+        StatementSet statementSet = util.tableEnv().createStatementSet();
+        statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM source1)");
+        statementSet.addInsertSql("INSERT INTO sink1 (SELECT * FROM type_coercion_source)");
+        util.verifyExecPlan(statementSet);
+    }
+
+    protected abstract TableTestUtil getTableTestUtil(TableConfig tableConfig);
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
new file mode 100644
index 0000000..900bfec
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/common/sql/SinkReuseITCase.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.common.sql;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for merging table sink. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class SinkReuseITCase extends AbstractTestBase {
+    @Parameter public Boolean isBatch;
+
+    @Parameters(name = "isBatch: {0}")
+    public static Collection<Boolean> parameters() {
+        return List.of(true);
+    }
+
+    TableEnvironment tEnv;
+
+    void setup(boolean isBatch) throws Exception {
+        EnvironmentSettings settings;
+        if (isBatch) {
+            settings = EnvironmentSettings.newInstance().inBatchMode().build();
+            tEnv = TableEnvironmentImpl.create(settings);
+        } else {
+            settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(4);
+            env.getConfig().enableObjectReuse();
+            tEnv = StreamTableEnvironment.create(env, settings);
+        }
+
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED, true);
+
+        String dataId1 =
+                TestValuesTableFactory.registerData(
+                        Arrays.asList(Row.of(1, 1.1d, "Tom"), Row.of(2, 1.2d, "Jerry")));
+
+        String dataId2 =
+                TestValuesTableFactory.registerData(
+                        Arrays.asList(Row.of(1, 2.1d, "Alice"), Row.of(2, 2.2d, "Bob")));
+
+        String dataId3 =
+                TestValuesTableFactory.registerData(
+                        Arrays.asList(Row.of(1, 3.1d, "Jack"), Row.of(2, 3.2d, "Rose")));
+
+        createSourceTable("source1", getSourceOptions(dataId1));
+        createSourceTable("source2", getSourceOptions(dataId2));
+        createSourceTable("source3", getSourceOptions(dataId3));
+
+        createSinkTable("sink1", getSinkOptions());
+        createSinkTable("sink2", getSinkOptions());
+    }
+
+    @TestTemplate
+    public void testSinkMergeFromSameSource() throws Exception {
+        setup(isBatch);
+        StatementSet statementSet = tEnv.createStatementSet();
+        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
+        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
+        statementSet.execute().await();
+
+        List<String> sink1Result = TestValuesTableFactory.getResultsAsStrings("sink1");
+        List<String> sink1Expected =
+                Arrays.asList(
+                        "+I[1, 1.1, Tom]",
+                        "+I[2, 1.2, Jerry]",
+                        "+I[1, 1.1, Tom]",
+                        "+I[2, 1.2, Jerry]");
+        assertResult(sink1Expected, sink1Result);
+    }
+
+    @TestTemplate
+    public void testMergeSink() throws Exception {
+        setup(isBatch);
+        StatementSet statementSet = tEnv.createStatementSet();
+        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source1");
+        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source2");
+        statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source3");
+        statementSet.execute().await();
+
+        List<String> sink1Result = TestValuesTableFactory.getResultsAsStrings("sink1");
+        List<String> sink2Result = TestValuesTableFactory.getResultsAsStrings("sink2");
+
+        List<String> sink1Expected =
+                Arrays.asList(
+                        "+I[1, 1.1, Tom]",
+                        "+I[2, 1.2, Jerry]",
+                        "+I[1, 2.1, Alice]",
+                        "+I[2, 2.2, Bob]");
+        List<String> sink3Expected = Arrays.asList("+I[1, 3.1, Jack]", "+I[2, 3.2, Rose]");
+
+        assertResult(sink1Expected, sink1Result);
+        assertResult(sink3Expected, sink2Result);
+    }
+
+    private Map<String, String> getSourceOptions(String dataId) {
+        Map<String, String> sourceOptions = new HashMap<>();
+        sourceOptions.put("connector", "values");
+        sourceOptions.put("bounded", "true");
+        sourceOptions.put("data-id", dataId);
+        return sourceOptions;
+    }
+
+    private Map<String, String> getSinkOptions() {
+        Map<String, String> sinkOptions = new HashMap<>();
+        sinkOptions.put("connector", "values");
+        sinkOptions.put("bounded", "true");
+        return sinkOptions;
+    }
+
+    private void createSinkTable(String tableName, Map<String, String> options) {
+        String ddl =
+                String.format(
+                        "CREATE TABLE `%s` (\n"
+                                + "  a int,\n"
+                                + "  b double,\n"
+                                + "  c string\n"
+                                + ")  WITH (\n"
+                                + " %s \n"
+                                + ")",
+                        tableName, makeWithOptions(options));
+        tEnv.executeSql(ddl);
+    }
+
+    private void createSourceTable(String tableName, Map<String, String> options) {
+        String ddl =
+                String.format(
+                        "CREATE TABLE `%s` (\n"
+                                + "  a int,\n"
+                                + "  b double,\n"
+                                + "  c string,\n"
+                                + "  PRIMARY KEY (a) NOT ENFORCED\n"
+                                + ")  WITH (\n"
+                                + " %s \n"
+                                + ")",
+                        tableName, makeWithOptions(options));
+        tEnv.executeSql(ddl);
+    }
+
+    private String makeWithOptions(Map<String, String> options) {
+        return options.keySet().stream()
+                .map(key -> String.format("  '%s' = '%s'", key, options.get(key)))
+                .collect(Collectors.joining(",\n"));
+    }
+
+    private void assertResult(List<String> expected, List<String> actual) {
+        Collections.sort(expected);
+        Collections.sort(actual);
+        assertThat(actual).isEqualTo(expected);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml
new file mode 100644
index 0000000..f3c3392
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml
@@ -0,0 +1,245 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+	<TestCase name="testSinkReuse">
+		<Resource name="ast">
+			<![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.sink2], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+
+LogicalSink(table=[default_catalog.default_database.sink2], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source4]])
+]]>
+		</Resource>
+		<Resource name="optimized exec plan">
+			<![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+   +- TableSourceScan(table=[[default_catalog, default_database, source2]], fields=[x, y])
+
+Sink(table=[default_catalog.default_database.sink2], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source3]], fields=[x, y])
+   +- TableSourceScan(table=[[default_catalog, default_database, source4]], fields=[x, y])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testSinkReuseFromSameSource">
+		<Resource name="ast">
+			<![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+]]>
+		</Resource>
+		<Resource name="optimized exec plan">
+			<![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])(reuse_id=[1])
+   +- Reused(reference_id=[1])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testSinkReuseWithDifferentFieldNames">
+		<Resource name="ast">
+			<![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x1, y1])
++- LogicalProject(x1=[$0], y1=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, filed_name_change_source]])
+]]>
+		</Resource>
+		<Resource name="optimized exec plan">
+			<![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+   +- TableSourceScan(table=[[default_catalog, default_database, filed_name_change_source]], fields=[x1, y1])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testSinkReuseWithOverwrite">
+		<Resource name="ast">
+			<![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+]]>
+		</Resource>
+		<Resource name="optimized exec plan">
+			<![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+   +- TableSourceScan(table=[[default_catalog, default_database, source3]], fields=[x, y])
+
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- TableSourceScan(table=[[default_catalog, default_database, source2]], fields=[x, y])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testSinkReuseWithHint">
+		<Resource name="ast">
+			<![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore1}]]])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore2}]]])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore1}]]])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+]]>
+		</Resource>
+		<Resource name="optimized exec plan">
+			<![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore1}]]])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+   +- TableSourceScan(table=[[default_catalog, default_database, source3]], fields=[x, y])
+
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPTIONS options:{path=ignore2}]]])
++- TableSourceScan(table=[[default_catalog, default_database, source2]], fields=[x, y])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testSinkReuseWithPartialColumns">
+		<Resource name="ast">
+			<![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
++- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
++- LogicalProject(EXPR$0=[null:BIGINT], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
++- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+]]>
+		</Resource>
+		<Resource name="optimized exec plan">
+			<![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
+
+Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
++- Union(all=[true], union=[x, EXPR$1])
+   :- Calc(select=[x, null:BIGINT AS EXPR$1])
+   :  +- Reused(reference_id=[1])
+   +- Calc(select=[x, null:BIGINT AS EXPR$1])
+      +- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
+
+Sink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
++- Calc(select=[null:BIGINT AS EXPR$0, y])
+   +- Reused(reference_id=[1])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testSinkReuseWithTypeCoercionSource">
+		<Resource name="ast">
+			<![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[CAST($0):BIGINT], y=[CAST($1):BIGINT])
+   +- LogicalTableScan(table=[[default_catalog, default_database, type_coercion_source]])
+]]>
+		</Resource>
+		<Resource name="optimized exec plan">
+			<![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[x, y])
+   +- Calc(select=[CAST(x AS BIGINT) AS x, CAST(y AS BIGINT) AS y])
+      +- TableSourceScan(table=[[default_catalog, default_database, type_coercion_source]], fields=[x, y])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testSinkReuseWithPartition">
+		<Resource name="ast">
+			<![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- LogicalProject(EXPR$0=[1:BIGINT], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- LogicalProject(EXPR$0=[1:BIGINT], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- LogicalProject(EXPR$0=[2:BIGINT], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+
+LogicalSink(table=[default_catalog.default_database.partitionedSink], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source4]])
+]]>
+		</Resource>
+		<Resource name="optimized exec plan">
+			<![CDATA[
+Sink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- Union(all=[true], union=[EXPR$0, y])
+   :- Calc(select=[1 AS EXPR$0, y])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, source1, project=[y], metadata=[]]], fields=[y])
+   +- Calc(select=[1 AS EXPR$0, y])
+      +- TableSourceScan(table=[[default_catalog, default_database, source2, project=[y], metadata=[]]], fields=[y])
+
+Sink(table=[default_catalog.default_database.partitionedSink], fields=[EXPR$0, y])
++- Calc(select=[2 AS EXPR$0, y])
+   +- TableSourceScan(table=[[default_catalog, default_database, source3, project=[y], metadata=[]]], fields=[y])
+
+Sink(table=[default_catalog.default_database.partitionedSink], fields=[x, y])
++- Sort(orderBy=[x ASC])
+   +- TableSourceScan(table=[[default_catalog, default_database, source4]], fields=[x, y])
+]]>
+		</Resource>
+	</TestCase>
+</Root>