[FLINK-32001][table] Row-level update should support returning partial columns
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/delete.q b/flink-table/flink-sql-client/src/test/resources/sql/delete.q
index 1604f27..7fcd56d 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/delete.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/delete.q
@@ -29,7 +29,7 @@
 !info
 
 # create a table first
-CREATE TABLE t (a int, b string, c double)
+CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED, b string, c double)
 WITH (
   'connector' = 'test-update-delete',
   'data-id' = '$VAR_DELETE_TABLE_DATA_ID',
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 441ddcc..42e4f95 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -386,36 +386,37 @@
         RowLevelModificationScanContext context = RowLevelModificationContextUtils.getScanContext();
         SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo =
                 supportsRowLevelDelete.applyRowLevelDelete(context);
-        sinkAbilitySpecs.add(
-                new RowLevelDeleteSpec(rowLevelDeleteInfo.getRowLevelDeleteMode(), context));
 
         if (rowLevelDeleteInfo.getRowLevelDeleteMode()
-                == SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) {
-            // convert the LogicalTableModify node to a rel node representing row-level delete
-            return convertToRowLevelDelete(
-                    tableModify,
-                    contextResolvedTable,
-                    rowLevelDeleteInfo,
-                    tableDebugName,
-                    dataTypeFactory,
-                    typeFactory);
-        } else if (rowLevelDeleteInfo.getRowLevelDeleteMode()
+                        != SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS
+                && rowLevelDeleteInfo.getRowLevelDeleteMode()
+                        != SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
+            throw new TableException(
+                    "Unknown delete mode: " + rowLevelDeleteInfo.getRowLevelDeleteMode());
+        }
+
+        if (rowLevelDeleteInfo.getRowLevelDeleteMode()
                 == SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
             // if it's for remaining row, convert the predicate in where clause
             // to the negative predicate
             convertPredicateToNegative(tableModify);
-            // convert the LogicalTableModify node to a rel node representing row-level delete
-            return convertToRowLevelDelete(
-                    tableModify,
-                    contextResolvedTable,
-                    rowLevelDeleteInfo,
-                    tableDebugName,
-                    dataTypeFactory,
-                    typeFactory);
-        } else {
-            throw new TableException(
-                    "Unknown delete mode: " + rowLevelDeleteInfo.getRowLevelDeleteMode());
         }
+
+        // convert the LogicalTableModify node to a RelNode representing row-level delete
+        Tuple2<RelNode, int[]> deleteRelNodeAndRequireIndices =
+                convertToRowLevelDelete(
+                        tableModify,
+                        contextResolvedTable,
+                        rowLevelDeleteInfo,
+                        tableDebugName,
+                        dataTypeFactory,
+                        typeFactory);
+        sinkAbilitySpecs.add(
+                new RowLevelDeleteSpec(
+                        rowLevelDeleteInfo.getRowLevelDeleteMode(),
+                        context,
+                        deleteRelNodeAndRequireIndices.f1));
+        return deleteRelNodeAndRequireIndices.f0;
     }
 
     private static RelNode convertUpdate(
@@ -445,16 +446,21 @@
             throw new IllegalArgumentException(
                     "Unknown update mode:" + updateInfo.getRowLevelUpdateMode());
         }
+        Tuple2<RelNode, int[]> updateRelNodeAndRequireIndices =
+                convertToRowLevelUpdate(
+                        tableModify,
+                        contextResolvedTable,
+                        updateInfo,
+                        tableDebugName,
+                        dataTypeFactory,
+                        typeFactory);
         sinkAbilitySpecs.add(
                 new RowLevelUpdateSpec(
-                        updatedColumns, updateInfo.getRowLevelUpdateMode(), context));
-        return convertToRowLevelUpdate(
-                tableModify,
-                contextResolvedTable,
-                updateInfo,
-                tableDebugName,
-                dataTypeFactory,
-                typeFactory);
+                        updatedColumns,
+                        updateInfo.getRowLevelUpdateMode(),
+                        context,
+                        updateRelNodeAndRequireIndices.f1));
+        return updateRelNodeAndRequireIndices.f0;
     }
 
     private static List<Column> getUpdatedColumns(
@@ -469,8 +475,13 @@
         return updatedColumns;
     }
 
-    /** Convert tableModify node to a rel node representing for row-level delete. */
-    private static RelNode convertToRowLevelDelete(
+    /**
+     * Convert tableModify node to a RelNode representing for row-level delete.
+     *
+     * @return a tuple contains the RelNode and the index for the required physical columns for
+     *     row-level delete.
+     */
+    private static Tuple2<RelNode, int[]> convertToRowLevelDelete(
             LogicalTableModify tableModify,
             ContextResolvedTable contextResolvedTable,
             SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo,
@@ -495,14 +506,25 @@
                     addExtraMetaCols(
                             tableModify, tableScan, tableDebugName, metadataColumns, typeFactory);
         }
+
         // create a project only select the required columns for delete
-        return projectColumnsForDelete(
-                tableModify,
-                resolvedSchema,
-                colIndexes,
-                tableDebugName,
-                dataTypeFactory,
-                typeFactory);
+        return Tuple2.of(
+                projectColumnsForDelete(
+                        tableModify,
+                        resolvedSchema,
+                        colIndexes,
+                        tableDebugName,
+                        dataTypeFactory,
+                        typeFactory),
+                getPhysicalColumnIndices(colIndexes, resolvedSchema));
+    }
+
+    /** Return the indices from {@param colIndexes} that belong to physical column. */
+    private static int[] getPhysicalColumnIndices(List<Integer> colIndexes, ResolvedSchema schema) {
+        return colIndexes.stream()
+                .filter(i -> schema.getColumns().get(i).isPhysical())
+                .mapToInt(i -> i)
+                .toArray();
     }
 
     /** Convert the predicate in WHERE clause to the negative predicate. */
@@ -606,8 +628,13 @@
         return (LogicalTableScan) relNode;
     }
 
-    /** Convert tableModify node to a RelNode representing for row-level update. */
-    private static RelNode convertToRowLevelUpdate(
+    /**
+     * Convert tableModify node to a RelNode representing for row-level update.
+     *
+     * @return a tuple contains the RelNode and the index for the required physical columns for
+     *     row-level update.
+     */
+    private static Tuple2<RelNode, int[]> convertToRowLevelUpdate(
             LogicalTableModify tableModify,
             ContextResolvedTable contextResolvedTable,
             SupportsRowLevelUpdate.RowLevelUpdateInfo rowLevelUpdateInfo,
@@ -622,7 +649,7 @@
         LogicalTableScan tableScan = getSourceTableScan(tableModify);
         Tuple2<List<Integer>, List<MetadataColumn>> colsIndexAndExtraMetaCols =
                 getRequireColumnsIndexAndExtraMetaCols(tableScan, requiredColumns, resolvedSchema);
-        List<Integer> updatedIndexes = colsIndexAndExtraMetaCols.f0;
+        List<Integer> colIndexes = colsIndexAndExtraMetaCols.f0;
         List<MetadataColumn> metadataColumns = colsIndexAndExtraMetaCols.f1;
         // if meta columns size is greater than 0, we need to modify the underlying
         // LogicalTableScan to make it can read meta column
@@ -632,16 +659,17 @@
                     addExtraMetaCols(
                             tableModify, tableScan, tableDebugName, metadataColumns, typeFactory);
         }
-
-        return projectColumnsForUpdate(
-                tableModify,
-                originColsCount,
-                resolvedSchema,
-                updatedIndexes,
-                rowLevelUpdateInfo.getRowLevelUpdateMode(),
-                tableDebugName,
-                dataTypeFactory,
-                typeFactory);
+        return Tuple2.of(
+                projectColumnsForUpdate(
+                        tableModify,
+                        originColsCount,
+                        resolvedSchema,
+                        colIndexes,
+                        rowLevelUpdateInfo.getRowLevelUpdateMode(),
+                        tableDebugName,
+                        dataTypeFactory,
+                        typeFactory),
+                getPhysicalColumnIndices(colIndexes, resolvedSchema));
     }
 
     // create a project only select the required column or expression for update
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
index cd26a7b..29f87df 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
@@ -33,17 +33,20 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.Objects;
 
 /**
  * A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level
- * delete mode to/from JSON, but also can delete existing data for {@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ * delete mode & required physical column indices to/from JSON, but also can delete existing data
+ * for {@link org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonTypeName("RowLevelDelete")
 public class RowLevelDeleteSpec implements SinkAbilitySpec {
     public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = "rowLevelDeleteMode";
+    public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES =
+            "requiredPhysicalColumnIndices";
 
     @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE)
     @Nonnull
@@ -51,13 +54,20 @@
 
     @JsonIgnore @Nullable private final RowLevelModificationScanContext scanContext;
 
+    @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES)
+    @Nonnull
+    private final int[] requiredPhysicalColumnIndices;
+
     @JsonCreator
     public RowLevelDeleteSpec(
             @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) @Nonnull
                     SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode,
-            @Nullable RowLevelModificationScanContext scanContext) {
+            @Nullable RowLevelModificationScanContext scanContext,
+            @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES) @Nonnull
+                    int[] requiredPhysicalColumnIndices) {
         this.rowLevelDeleteMode = Preconditions.checkNotNull(rowLevelDeleteMode);
         this.scanContext = scanContext;
+        this.requiredPhysicalColumnIndices = requiredPhysicalColumnIndices;
     }
 
     @Override
@@ -77,6 +87,10 @@
         return rowLevelDeleteMode;
     }
 
+    public int[] getRequiredPhysicalColumnIndices() {
+        return requiredPhysicalColumnIndices;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -87,11 +101,14 @@
         }
         RowLevelDeleteSpec that = (RowLevelDeleteSpec) o;
         return rowLevelDeleteMode == that.rowLevelDeleteMode
-                && Objects.equals(scanContext, that.scanContext);
+                && Objects.equals(scanContext, that.scanContext)
+                && Arrays.equals(requiredPhysicalColumnIndices, that.requiredPhysicalColumnIndices);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(rowLevelDeleteMode, scanContext);
+        int result = Objects.hash(rowLevelDeleteMode, scanContext);
+        result = 31 * result + Arrays.hashCode(requiredPhysicalColumnIndices);
+        return result;
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
index d046ab0..17c9176 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
@@ -33,19 +33,22 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
 /**
  * A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level
- * update mode & columns to/from JSON, but also can update existing data for {@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ * update mode, columns & required physical column indices to/from JSON, but also can update
+ * existing data for {@link org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonTypeName("RowLevelUpdate")
 public class RowLevelUpdateSpec implements SinkAbilitySpec {
     public static final String FIELD_NAME_UPDATED_COLUMNS = "updatedColumns";
     public static final String FIELD_NAME_ROW_LEVEL_UPDATE_MODE = "rowLevelUpdateMode";
+    public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES =
+            "requiredPhysicalColumnIndices";
 
     @JsonProperty(FIELD_NAME_UPDATED_COLUMNS)
     @Nonnull
@@ -55,6 +58,10 @@
     @Nonnull
     private final SupportsRowLevelUpdate.RowLevelUpdateMode rowLevelUpdateMode;
 
+    @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES)
+    @Nonnull
+    private final int[] requiredPhysicalColumnIndices;
+
     @JsonIgnore @Nullable private final RowLevelModificationScanContext scanContext;
 
     @JsonCreator
@@ -62,10 +69,13 @@
             @JsonProperty(FIELD_NAME_UPDATED_COLUMNS) @Nonnull List<Column> updatedColumns,
             @JsonProperty(FIELD_NAME_ROW_LEVEL_UPDATE_MODE) @Nonnull
                     SupportsRowLevelUpdate.RowLevelUpdateMode rowLevelUpdateMode,
-            @Nullable RowLevelModificationScanContext scanContext) {
+            @Nullable RowLevelModificationScanContext scanContext,
+            @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES) @Nonnull
+                    int[] requiredPhysicalColumnIndices) {
         this.updatedColumns = updatedColumns;
         this.rowLevelUpdateMode = rowLevelUpdateMode;
         this.scanContext = scanContext;
+        this.requiredPhysicalColumnIndices = requiredPhysicalColumnIndices;
     }
 
     @Override
@@ -85,6 +95,10 @@
         return rowLevelUpdateMode;
     }
 
+    public int[] getRequiredPhysicalColumnIndices() {
+        return requiredPhysicalColumnIndices;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -96,11 +110,14 @@
         RowLevelUpdateSpec that = (RowLevelUpdateSpec) o;
         return Objects.equals(updatedColumns, that.updatedColumns)
                 && rowLevelUpdateMode == that.rowLevelUpdateMode
+                && Arrays.equals(requiredPhysicalColumnIndices, that.requiredPhysicalColumnIndices)
                 && Objects.equals(scanContext, that.scanContext);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(updatedColumns, rowLevelUpdateMode, scanContext);
+        int result = Objects.hash(updatedColumns, rowLevelUpdateMode, scanContext);
+        result = 31 * result + Arrays.hashCode(requiredPhysicalColumnIndices);
+        return result;
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index c906865..520fe12 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -20,10 +20,16 @@
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
@@ -31,8 +37,11 @@
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * Batch {@link ExecNode} to to write data into an external sink defined by a {@link
@@ -74,4 +83,61 @@
                 false,
                 null);
     }
+
+    @Override
+    protected RowType getPhysicalRowType(ResolvedSchema schema) {
+        // row-level modification may only write partial columns,
+        // so we try to prune the RowType to get the real RowType containing
+        // the physical columns to be written
+        if (tableSinkSpec.getSinkAbilities() != null) {
+            for (SinkAbilitySpec sinkAbilitySpec : tableSinkSpec.getSinkAbilities()) {
+                if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
+                    RowLevelUpdateSpec rowLevelUpdateSpec = (RowLevelUpdateSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(
+                            schema, rowLevelUpdateSpec.getRequiredPhysicalColumnIndices());
+                } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
+                    RowLevelDeleteSpec rowLevelDeleteSpec = (RowLevelDeleteSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(
+                            schema, rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());
+                }
+            }
+        }
+        return (RowType) schema.toPhysicalRowDataType().getLogicalType();
+    }
+
+    @Override
+    protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) {
+        if (schema.getPrimaryKey().isPresent()) {
+            UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+            int[] primaryKeyIndices = new int[uniqueConstraint.getColumns().size()];
+            // SinkRowType may not contain full primary keys in case of row-level update or delete.
+            // In such case, return an empty array since the primary keys are not completed and
+            // we consider such case as no primary keys
+            // Note: this may happen if the required columns returned by
+            // connector don't fully contain the primary keys. But it's not recommended to only
+            // return partial primary keys
+            // For example, a table has primary keys: a, b, c; but the connector only return a, b
+            // in method SupportsRowLevelUpdate#applyRowLevelUpdate.
+            for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+                int fieldIndex = sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+                if (fieldIndex == -1) {
+                    return new int[0];
+                }
+                primaryKeyIndices[i] = fieldIndex;
+            }
+            return primaryKeyIndices;
+        } else {
+            return new int[0];
+        }
+    }
+
+    /** Get the physical row type with given column indices. */
+    private RowType getPhysicalRowType(ResolvedSchema schema, int[] columnIndices) {
+        List<Column> columns = schema.getColumns();
+        List<Column> requireColumns = new ArrayList<>();
+        for (int columnIndex : columnIndices) {
+            requireColumns.add(columns.get(columnIndex));
+        }
+        return (RowType) ResolvedSchema.of(requireColumns).toPhysicalRowDataType().getLogicalType();
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 9cde92d..c3debde 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -644,13 +644,13 @@
         return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
     }
 
-    private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) {
+    protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) {
         return schema.getPrimaryKey()
                 .map(k -> k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
                 .orElse(new int[0]);
     }
 
-    private RowType getPhysicalRowType(ResolvedSchema schema) {
+    protected RowType getPhysicalRowType(ResolvedSchema schema) {
         return (RowType) schema.toPhysicalRowDataType().getLogicalType();
     }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
index fa38811..2e9c914 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
@@ -75,6 +75,7 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.data.RowData.createFieldGetter;
 
@@ -331,6 +332,7 @@
         protected final String dataId;
 
         protected boolean isUpdate;
+        protected int[] requiredColumnIndices;
 
         public SupportsRowLevelUpdateSink(
                 ObjectIdentifier tableIdentifier,
@@ -383,10 +385,15 @@
                                     new UpdateDataSinkFunction(
                                             dataId,
                                             getPrimaryKeyFieldGetter(
-                                                    resolvedCatalogTable.getResolvedSchema()),
+                                                    resolvedCatalogTable.getResolvedSchema(),
+                                                    requiredColumnIndices),
                                             getAllFieldGetter(
                                                     resolvedCatalogTable.getResolvedSchema()),
-                                            updateMode))
+                                            getPartialFieldGetter(
+                                                    resolvedCatalogTable.getResolvedSchema(),
+                                                    requiredColumnIndices),
+                                            updateMode,
+                                            requiredColumnIndices))
                             .setParallelism(1);
                 }
             };
@@ -428,6 +435,8 @@
                                         requireColumnsForUpdate,
                                         resolvedCatalogTable.getResolvedSchema());
                     }
+                    requiredColumnIndices =
+                            getRequiredColumnIndexes(resolvedCatalogTable, requiredCols);
                     return Optional.ofNullable(requiredCols);
                 }
 
@@ -450,6 +459,7 @@
         private final List<String> requireColumnsForDelete;
 
         private boolean isDelete;
+        protected int[] requiredColumnIndices;
 
         public SupportsRowLevelModificationSink(
                 ObjectIdentifier tableIdentifier,
@@ -519,6 +529,10 @@
                                     .addSink(
                                             new DeleteDataSinkFunction(
                                                     dataId,
+                                                    getPrimaryKeyFieldGetter(
+                                                            resolvedCatalogTable
+                                                                    .getResolvedSchema(),
+                                                            requiredColumnIndices),
                                                     getAllFieldGetter(
                                                             resolvedCatalogTable
                                                                     .getResolvedSchema()),
@@ -568,6 +582,8 @@
                                         requireColumnsForDelete,
                                         resolvedCatalogTable.getResolvedSchema());
                     }
+                    requiredColumnIndices =
+                            getRequiredColumnIndexes(resolvedCatalogTable, requiredCols);
                     return Optional.ofNullable(requiredCols);
                 }
 
@@ -582,7 +598,8 @@
     /** The sink for delete existing data. */
     private static class DeleteDataSinkFunction extends RichSinkFunction<RowData> {
         private final String dataId;
-        private final RowData.FieldGetter[] fieldGetters;
+        private final RowData.FieldGetter[] primaryKeyFieldGetters;
+        private final RowData.FieldGetter[] allFieldGetters;
         private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode;
 
         private transient Collection<RowData> data;
@@ -590,10 +607,12 @@
 
         DeleteDataSinkFunction(
                 String dataId,
-                RowData.FieldGetter[] fieldGetters,
+                RowData.FieldGetter[] primaryKeyFieldGetters,
+                RowData.FieldGetter[] allFieldGetters,
                 SupportsRowLevelDelete.RowLevelDeleteMode deleteMode) {
             this.dataId = dataId;
-            this.fieldGetters = fieldGetters;
+            this.primaryKeyFieldGetters = primaryKeyFieldGetters;
+            this.allFieldGetters = allFieldGetters;
             this.deleteMode = deleteMode;
         }
 
@@ -620,7 +639,7 @@
                     String.format(
                             "The RowKind for the coming rows should be %s in delete mode %s.",
                             RowKind.DELETE, DELETE_MODE));
-            data.removeIf(rowData -> equal(rowData, deletedRow, fieldGetters));
+            data.removeIf(rowData -> equal(rowData, deletedRow, primaryKeyFieldGetters));
         }
 
         private void consumeRemainingRows(RowData remainingRow) {
@@ -629,7 +648,12 @@
                     String.format(
                             "The RowKind for the coming rows should be %s in delete mode %s.",
                             RowKind.INSERT, DELETE_MODE));
-            newData.add(copyRowData(remainingRow, fieldGetters));
+            // find the row that match the remaining row
+            for (RowData oldRow : data) {
+                if (equal(oldRow, remainingRow, primaryKeyFieldGetters)) {
+                    newData.add(copyRowData(oldRow, allFieldGetters));
+                }
+            }
         }
 
         @Override
@@ -720,6 +744,24 @@
         }
     }
 
+    private static int[] getRequiredColumnIndexes(
+            ResolvedCatalogTable resolvedCatalogTable, @Nullable List<Column> columns) {
+        if (columns == null) {
+            return IntStream.range(0, resolvedCatalogTable.getResolvedSchema().getColumnCount())
+                    .toArray();
+        } else {
+            List<Column> allColumns = resolvedCatalogTable.getResolvedSchema().getColumns();
+            int[] columnIndexes = new int[columns.size()];
+            for (int i = 0; i < columns.size(); i++) {
+                int colIndex = allColumns.indexOf(columns.get(i));
+                if (colIndex != -1) {
+                    columnIndexes[i] = colIndex;
+                }
+            }
+            return columnIndexes;
+        }
+    }
+
     /**
      * Get a list of equal predicate from a list of filter, each contains [column, value]. Return
      * Optional.empty() if it contains any non-equal predicate.
@@ -817,7 +859,9 @@
         private final String dataId;
         private final RowData.FieldGetter[] primaryKeyFieldGetters;
         private final RowData.FieldGetter[] allFieldGetters;
+        private final RowData.FieldGetter[] requireColumnFieldGetters;
         private final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode;
+        private final int[] requiredColumnIndexes;
         private transient RowData[] oldRows;
         private transient List<Tuple2<Integer, RowData>> updatedRows;
         private transient List<RowData> allNewRows;
@@ -826,11 +870,15 @@
                 String dataId,
                 RowData.FieldGetter[] primaryKeyFieldGetters,
                 RowData.FieldGetter[] allFieldGetters,
-                SupportsRowLevelUpdate.RowLevelUpdateMode updateMode) {
+                RowData.FieldGetter[] requireColumnFieldGetters,
+                SupportsRowLevelUpdate.RowLevelUpdateMode updateMode,
+                int[] requiredColumnIndexes) {
             this.dataId = dataId;
             this.primaryKeyFieldGetters = primaryKeyFieldGetters;
             this.updateMode = updateMode;
             this.allFieldGetters = allFieldGetters;
+            this.requireColumnFieldGetters = requireColumnFieldGetters;
+            this.requiredColumnIndexes = requiredColumnIndexes;
         }
 
         @Override
@@ -858,7 +906,8 @@
 
             for (int i = 0; i < oldRows.length; i++) {
                 if (equal(oldRows[i], updatedRow, primaryKeyFieldGetters)) {
-                    updatedRows.add(new Tuple2<>(i, copyRowData(updatedRow, allFieldGetters)));
+                    updatedRows.add(
+                            new Tuple2<>(i, getUpdatedAfterRowDataWithAllFields(updatedRow)));
                 }
             }
         }
@@ -867,7 +916,25 @@
             Preconditions.checkArgument(
                     rowData.getRowKind() == RowKind.INSERT,
                     "The RowKind for the updated rows should be " + RowKind.INSERT);
-            allNewRows.add(copyRowData(rowData, allFieldGetters));
+            allNewRows.add(getUpdatedAfterRowDataWithAllFields(rowData));
+        }
+
+        private RowData getUpdatedAfterRowDataWithAllFields(RowData updateAfterRowData) {
+            GenericRowData newRowData = null;
+            // first find the old row to be updated and copy the old values
+            for (RowData oldRow : oldRows) {
+                if (equal(oldRow, updateAfterRowData, primaryKeyFieldGetters)) {
+                    newRowData = copyRowData(oldRow, allFieldGetters);
+                }
+            }
+            Preconditions.checkNotNull(newRowData);
+            // then set the new value after updated
+            for (int i = 0; i < requiredColumnIndexes.length; i++) {
+                newRowData.setField(
+                        requiredColumnIndexes[i],
+                        requireColumnFieldGetters[i].getFieldOrNull(updateAfterRowData));
+            }
+            return newRowData;
         }
 
         @Override
@@ -904,15 +971,25 @@
                 "The scan context should contains the object identifier for row-level modification.");
     }
 
-    private static RowData.FieldGetter[] getPrimaryKeyFieldGetter(ResolvedSchema resolvedSchema) {
-        int[] indexes = resolvedSchema.getPrimaryKeyIndexes();
-        RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[indexes.length];
+    /**
+     * Get the an array of FieldGetter for the primary keys which are also in {@param
+     * requiredColumnIndices}.
+     */
+    private static RowData.FieldGetter[] getPrimaryKeyFieldGetter(
+            ResolvedSchema resolvedSchema, int[] requiredColumnIndices) {
+        List<RowData.FieldGetter> fieldGetters = new ArrayList<>();
+        int[] primaryKeyIndices = resolvedSchema.getPrimaryKeyIndexes();
         List<DataType> dataTypes = resolvedSchema.getColumnDataTypes();
-        for (int i = 0; i < fieldGetters.length; i++) {
-            int colIndex = indexes[i];
-            fieldGetters[i] = createFieldGetter(dataTypes.get(colIndex).getLogicalType(), colIndex);
+        for (final int primaryKeyIndex : primaryKeyIndices) {
+            // find the primaryKeyIndex in requiredColumnIndices
+            for (int i = 0; i < requiredColumnIndices.length; i++) {
+                if (requiredColumnIndices[i] == primaryKeyIndex) {
+                    fieldGetters.add(
+                            createFieldGetter(dataTypes.get(primaryKeyIndex).getLogicalType(), i));
+                }
+            }
         }
-        return fieldGetters;
+        return fieldGetters.toArray(new RowData.FieldGetter[0]);
     }
 
     private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema resolvedSchema) {
@@ -924,6 +1001,18 @@
         return fieldGetters;
     }
 
+    private static RowData.FieldGetter[] getPartialFieldGetter(
+            ResolvedSchema resolvedSchema, int[] partialColumIndexes) {
+        List<Column> columns = resolvedSchema.getColumns();
+        RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[partialColumIndexes.length];
+        for (int i = 0; i < fieldGetters.length; i++) {
+            fieldGetters[i] =
+                    createFieldGetter(
+                            columns.get(partialColumIndexes[i]).getDataType().getLogicalType(), i);
+        }
+        return fieldGetters;
+    }
+
     private static boolean equal(
             RowData value1, RowData value2, RowData.FieldGetter[] fieldGetters) {
         for (RowData.FieldGetter fieldGetter : fieldGetters) {
@@ -935,7 +1024,7 @@
         return true;
     }
 
-    private static RowData copyRowData(RowData rowData, RowData.FieldGetter[] fieldGetters) {
+    private static GenericRowData copyRowData(RowData rowData, RowData.FieldGetter[] fieldGetters) {
         Object[] values = new Object[fieldGetters.length];
         for (int i = 0; i < fieldGetters.length; i++) {
             values[i] = fieldGetters[i].getFieldOrNull(rowData);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
index b918b7a..21e2907 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
@@ -95,7 +95,8 @@
         String dataId = registerData();
         tEnv().executeSql(
                         String.format(
-                                "CREATE TABLE t (a int, b string, c double) WITH"
+                                "CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED,"
+                                        + " b string, c double) WITH"
                                         + " ('connector' = 'test-update-delete',"
                                         + " 'data-id' = '%s',"
                                         + " 'delete-mode' = '%s',"
@@ -112,12 +113,54 @@
     }
 
     @Test
+    public void testRowLevelDeleteWithPartitionColumn() throws Exception {
+        String dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t"
+                                        + " (a int PRIMARY KEY NOT ENFORCED,"
+                                        + " b string not null,"
+                                        + " c double not null) WITH"
+                                        + " ('connector' = 'test-update-delete',"
+                                        + " 'data-id' = '%s',"
+                                        + " 'delete-mode' = '%s',"
+                                        + " 'required-columns-for-delete' = 'a;c',"
+                                        + " 'support-delete-push-down' = 'false'"
+                                        + ")",
+                                dataId, deleteMode));
+        tEnv().executeSql("DELETE FROM t WHERE a > 1").await();
+        List<Row> rows = toRows(tEnv().executeSql("SELECT * FROM t"));
+        assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 2.0]]");
+
+        // test delete with requiring partial primary keys
+        dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t1"
+                                        + " (a int,"
+                                        + " b string not null,"
+                                        + " c double not null,"
+                                        + " PRIMARY KEY (a, c) NOT ENFORCED) WITH"
+                                        + " ('connector' = 'test-update-delete',"
+                                        + " 'data-id' = '%s',"
+                                        + " 'delete-mode' = '%s',"
+                                        + " 'required-columns-for-delete' = 'a;b',"
+                                        + " 'support-delete-push-down' = 'false'"
+                                        + ")",
+                                dataId, deleteMode));
+        tEnv().executeSql("DELETE FROM t1 WHERE a > 1").await();
+        rows = toRows(tEnv().executeSql("SELECT * FROM t1"));
+        assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 2.0]]");
+    }
+
+    @Test
     public void testMixDelete() throws Exception {
         // test mix delete push down and row-level delete
         String dataId = registerData();
         tEnv().executeSql(
                         String.format(
-                                "CREATE TABLE t (a int, b string, c double) WITH"
+                                "CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED,"
+                                        + " b string, c double) WITH"
                                         + " ('connector' = 'test-update-delete',"
                                         + " 'data-id' = '%s',"
                                         + " 'mix-delete' = 'true')",
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
index 793a67e..b7f5501 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
@@ -86,6 +86,46 @@
     }
 
     @Test
+    public void testPartialUpdate() throws Exception {
+        String dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t ("
+                                        + " a int PRIMARY KEY NOT ENFORCED,"
+                                        + " b string not null,"
+                                        + " c double not null) WITH"
+                                        + " ('connector' = 'test-update-delete', "
+                                        + "'data-id' = '%s',"
+                                        + " 'required-columns-for-update' = 'a;b', "
+                                        + " 'update-mode' = '%s')",
+                                dataId, updateMode));
+        tEnv().executeSql("UPDATE t SET b = 'uaa' WHERE a >= 1").await();
+        List<String> rows = toSortedResults(tEnv().executeSql("SELECT * FROM t"));
+        assertThat(rows.toString())
+                .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]");
+
+        // test partial update with requiring partial primary keys
+        dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t1 ("
+                                        + " a int,"
+                                        + " b string not null,"
+                                        + " c double not null,"
+                                        + " PRIMARY KEY (a, c) NOT ENFORCED"
+                                        + ") WITH"
+                                        + " ('connector' = 'test-update-delete', "
+                                        + "'data-id' = '%s',"
+                                        + " 'required-columns-for-update' = 'a;b', "
+                                        + " 'update-mode' = '%s')",
+                                dataId, updateMode));
+        tEnv().executeSql("UPDATE t1 SET b = 'uaa' WHERE a >= 1").await();
+        rows = toSortedResults(tEnv().executeSql("SELECT * FROM t1"));
+        assertThat(rows.toString())
+                .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]");
+    }
+
+    @Test
     public void testStatementSetContainUpdateAndInsert() throws Exception {
         tEnv().executeSql(
                         "CREATE TABLE t (a int, b string, c double) WITH"