[FLINK-32001][table] Row-level update should support returning partial columns
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/delete.q b/flink-table/flink-sql-client/src/test/resources/sql/delete.q
index 1604f27..7fcd56d 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/delete.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/delete.q
@@ -29,7 +29,7 @@
!info
# create a table first
-CREATE TABLE t (a int, b string, c double)
+CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED, b string, c double)
WITH (
'connector' = 'test-update-delete',
'data-id' = '$VAR_DELETE_TABLE_DATA_ID',
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 441ddcc..42e4f95 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -386,36 +386,37 @@
RowLevelModificationScanContext context = RowLevelModificationContextUtils.getScanContext();
SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo =
supportsRowLevelDelete.applyRowLevelDelete(context);
- sinkAbilitySpecs.add(
- new RowLevelDeleteSpec(rowLevelDeleteInfo.getRowLevelDeleteMode(), context));
if (rowLevelDeleteInfo.getRowLevelDeleteMode()
- == SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) {
- // convert the LogicalTableModify node to a rel node representing row-level delete
- return convertToRowLevelDelete(
- tableModify,
- contextResolvedTable,
- rowLevelDeleteInfo,
- tableDebugName,
- dataTypeFactory,
- typeFactory);
- } else if (rowLevelDeleteInfo.getRowLevelDeleteMode()
+ != SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS
+ && rowLevelDeleteInfo.getRowLevelDeleteMode()
+ != SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
+ throw new TableException(
+ "Unknown delete mode: " + rowLevelDeleteInfo.getRowLevelDeleteMode());
+ }
+
+ if (rowLevelDeleteInfo.getRowLevelDeleteMode()
== SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
// if it's for remaining row, convert the predicate in where clause
// to the negative predicate
convertPredicateToNegative(tableModify);
- // convert the LogicalTableModify node to a rel node representing row-level delete
- return convertToRowLevelDelete(
- tableModify,
- contextResolvedTable,
- rowLevelDeleteInfo,
- tableDebugName,
- dataTypeFactory,
- typeFactory);
- } else {
- throw new TableException(
- "Unknown delete mode: " + rowLevelDeleteInfo.getRowLevelDeleteMode());
}
+
+ // convert the LogicalTableModify node to a RelNode representing row-level delete
+ Tuple2<RelNode, int[]> deleteRelNodeAndRequireIndices =
+ convertToRowLevelDelete(
+ tableModify,
+ contextResolvedTable,
+ rowLevelDeleteInfo,
+ tableDebugName,
+ dataTypeFactory,
+ typeFactory);
+ sinkAbilitySpecs.add(
+ new RowLevelDeleteSpec(
+ rowLevelDeleteInfo.getRowLevelDeleteMode(),
+ context,
+ deleteRelNodeAndRequireIndices.f1));
+ return deleteRelNodeAndRequireIndices.f0;
}
private static RelNode convertUpdate(
@@ -445,16 +446,21 @@
throw new IllegalArgumentException(
"Unknown update mode:" + updateInfo.getRowLevelUpdateMode());
}
+ Tuple2<RelNode, int[]> updateRelNodeAndRequireIndices =
+ convertToRowLevelUpdate(
+ tableModify,
+ contextResolvedTable,
+ updateInfo,
+ tableDebugName,
+ dataTypeFactory,
+ typeFactory);
sinkAbilitySpecs.add(
new RowLevelUpdateSpec(
- updatedColumns, updateInfo.getRowLevelUpdateMode(), context));
- return convertToRowLevelUpdate(
- tableModify,
- contextResolvedTable,
- updateInfo,
- tableDebugName,
- dataTypeFactory,
- typeFactory);
+ updatedColumns,
+ updateInfo.getRowLevelUpdateMode(),
+ context,
+ updateRelNodeAndRequireIndices.f1));
+ return updateRelNodeAndRequireIndices.f0;
}
private static List<Column> getUpdatedColumns(
@@ -469,8 +475,13 @@
return updatedColumns;
}
- /** Convert tableModify node to a rel node representing for row-level delete. */
- private static RelNode convertToRowLevelDelete(
+ /**
+ * Convert tableModify node to a RelNode representing for row-level delete.
+ *
+ * @return a tuple contains the RelNode and the index for the required physical columns for
+ * row-level delete.
+ */
+ private static Tuple2<RelNode, int[]> convertToRowLevelDelete(
LogicalTableModify tableModify,
ContextResolvedTable contextResolvedTable,
SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo,
@@ -495,14 +506,25 @@
addExtraMetaCols(
tableModify, tableScan, tableDebugName, metadataColumns, typeFactory);
}
+
// create a project only select the required columns for delete
- return projectColumnsForDelete(
- tableModify,
- resolvedSchema,
- colIndexes,
- tableDebugName,
- dataTypeFactory,
- typeFactory);
+ return Tuple2.of(
+ projectColumnsForDelete(
+ tableModify,
+ resolvedSchema,
+ colIndexes,
+ tableDebugName,
+ dataTypeFactory,
+ typeFactory),
+ getPhysicalColumnIndices(colIndexes, resolvedSchema));
+ }
+
+ /** Return the indices from {@param colIndexes} that belong to physical column. */
+ private static int[] getPhysicalColumnIndices(List<Integer> colIndexes, ResolvedSchema schema) {
+ return colIndexes.stream()
+ .filter(i -> schema.getColumns().get(i).isPhysical())
+ .mapToInt(i -> i)
+ .toArray();
}
/** Convert the predicate in WHERE clause to the negative predicate. */
@@ -606,8 +628,13 @@
return (LogicalTableScan) relNode;
}
- /** Convert tableModify node to a RelNode representing for row-level update. */
- private static RelNode convertToRowLevelUpdate(
+ /**
+ * Convert tableModify node to a RelNode representing for row-level update.
+ *
+ * @return a tuple contains the RelNode and the index for the required physical columns for
+ * row-level update.
+ */
+ private static Tuple2<RelNode, int[]> convertToRowLevelUpdate(
LogicalTableModify tableModify,
ContextResolvedTable contextResolvedTable,
SupportsRowLevelUpdate.RowLevelUpdateInfo rowLevelUpdateInfo,
@@ -622,7 +649,7 @@
LogicalTableScan tableScan = getSourceTableScan(tableModify);
Tuple2<List<Integer>, List<MetadataColumn>> colsIndexAndExtraMetaCols =
getRequireColumnsIndexAndExtraMetaCols(tableScan, requiredColumns, resolvedSchema);
- List<Integer> updatedIndexes = colsIndexAndExtraMetaCols.f0;
+ List<Integer> colIndexes = colsIndexAndExtraMetaCols.f0;
List<MetadataColumn> metadataColumns = colsIndexAndExtraMetaCols.f1;
// if meta columns size is greater than 0, we need to modify the underlying
// LogicalTableScan to make it can read meta column
@@ -632,16 +659,17 @@
addExtraMetaCols(
tableModify, tableScan, tableDebugName, metadataColumns, typeFactory);
}
-
- return projectColumnsForUpdate(
- tableModify,
- originColsCount,
- resolvedSchema,
- updatedIndexes,
- rowLevelUpdateInfo.getRowLevelUpdateMode(),
- tableDebugName,
- dataTypeFactory,
- typeFactory);
+ return Tuple2.of(
+ projectColumnsForUpdate(
+ tableModify,
+ originColsCount,
+ resolvedSchema,
+ colIndexes,
+ rowLevelUpdateInfo.getRowLevelUpdateMode(),
+ tableDebugName,
+ dataTypeFactory,
+ typeFactory),
+ getPhysicalColumnIndices(colIndexes, resolvedSchema));
}
// create a project only select the required column or expression for update
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
index cd26a7b..29f87df 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
@@ -33,17 +33,20 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.Objects;
/**
* A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level
- * delete mode to/from JSON, but also can delete existing data for {@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ * delete mode & required physical column indices to/from JSON, but also can delete existing data
+ * for {@link org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("RowLevelDelete")
public class RowLevelDeleteSpec implements SinkAbilitySpec {
public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = "rowLevelDeleteMode";
+ public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES =
+ "requiredPhysicalColumnIndices";
@JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE)
@Nonnull
@@ -51,13 +54,20 @@
@JsonIgnore @Nullable private final RowLevelModificationScanContext scanContext;
+ @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES)
+ @Nonnull
+ private final int[] requiredPhysicalColumnIndices;
+
@JsonCreator
public RowLevelDeleteSpec(
@JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) @Nonnull
SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode,
- @Nullable RowLevelModificationScanContext scanContext) {
+ @Nullable RowLevelModificationScanContext scanContext,
+ @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES) @Nonnull
+ int[] requiredPhysicalColumnIndices) {
this.rowLevelDeleteMode = Preconditions.checkNotNull(rowLevelDeleteMode);
this.scanContext = scanContext;
+ this.requiredPhysicalColumnIndices = requiredPhysicalColumnIndices;
}
@Override
@@ -77,6 +87,10 @@
return rowLevelDeleteMode;
}
+ public int[] getRequiredPhysicalColumnIndices() {
+ return requiredPhysicalColumnIndices;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -87,11 +101,14 @@
}
RowLevelDeleteSpec that = (RowLevelDeleteSpec) o;
return rowLevelDeleteMode == that.rowLevelDeleteMode
- && Objects.equals(scanContext, that.scanContext);
+ && Objects.equals(scanContext, that.scanContext)
+ && Arrays.equals(requiredPhysicalColumnIndices, that.requiredPhysicalColumnIndices);
}
@Override
public int hashCode() {
- return Objects.hash(rowLevelDeleteMode, scanContext);
+ int result = Objects.hash(rowLevelDeleteMode, scanContext);
+ result = 31 * result + Arrays.hashCode(requiredPhysicalColumnIndices);
+ return result;
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
index d046ab0..17c9176 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
@@ -33,19 +33,22 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/**
* A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level
- * update mode & columns to/from JSON, but also can update existing data for {@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ * update mode, columns & required physical column indices to/from JSON, but also can update
+ * existing data for {@link org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("RowLevelUpdate")
public class RowLevelUpdateSpec implements SinkAbilitySpec {
public static final String FIELD_NAME_UPDATED_COLUMNS = "updatedColumns";
public static final String FIELD_NAME_ROW_LEVEL_UPDATE_MODE = "rowLevelUpdateMode";
+ public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES =
+ "requiredPhysicalColumnIndices";
@JsonProperty(FIELD_NAME_UPDATED_COLUMNS)
@Nonnull
@@ -55,6 +58,10 @@
@Nonnull
private final SupportsRowLevelUpdate.RowLevelUpdateMode rowLevelUpdateMode;
+ @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES)
+ @Nonnull
+ private final int[] requiredPhysicalColumnIndices;
+
@JsonIgnore @Nullable private final RowLevelModificationScanContext scanContext;
@JsonCreator
@@ -62,10 +69,13 @@
@JsonProperty(FIELD_NAME_UPDATED_COLUMNS) @Nonnull List<Column> updatedColumns,
@JsonProperty(FIELD_NAME_ROW_LEVEL_UPDATE_MODE) @Nonnull
SupportsRowLevelUpdate.RowLevelUpdateMode rowLevelUpdateMode,
- @Nullable RowLevelModificationScanContext scanContext) {
+ @Nullable RowLevelModificationScanContext scanContext,
+ @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES) @Nonnull
+ int[] requiredPhysicalColumnIndices) {
this.updatedColumns = updatedColumns;
this.rowLevelUpdateMode = rowLevelUpdateMode;
this.scanContext = scanContext;
+ this.requiredPhysicalColumnIndices = requiredPhysicalColumnIndices;
}
@Override
@@ -85,6 +95,10 @@
return rowLevelUpdateMode;
}
+ public int[] getRequiredPhysicalColumnIndices() {
+ return requiredPhysicalColumnIndices;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -96,11 +110,14 @@
RowLevelUpdateSpec that = (RowLevelUpdateSpec) o;
return Objects.equals(updatedColumns, that.updatedColumns)
&& rowLevelUpdateMode == that.rowLevelUpdateMode
+ && Arrays.equals(requiredPhysicalColumnIndices, that.requiredPhysicalColumnIndices)
&& Objects.equals(scanContext, that.scanContext);
}
@Override
public int hashCode() {
- return Objects.hash(updatedColumns, rowLevelUpdateMode, scanContext);
+ int result = Objects.hash(updatedColumns, rowLevelUpdateMode, scanContext);
+ result = 31 * result + Arrays.hashCode(requiredPhysicalColumnIndices);
+ return result;
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index c906865..520fe12 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -20,10 +20,16 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
@@ -31,8 +37,11 @@
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
/**
* Batch {@link ExecNode} to to write data into an external sink defined by a {@link
@@ -74,4 +83,61 @@
false,
null);
}
+
+ @Override
+ protected RowType getPhysicalRowType(ResolvedSchema schema) {
+ // row-level modification may only write partial columns,
+ // so we try to prune the RowType to get the real RowType containing
+ // the physical columns to be written
+ if (tableSinkSpec.getSinkAbilities() != null) {
+ for (SinkAbilitySpec sinkAbilitySpec : tableSinkSpec.getSinkAbilities()) {
+ if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
+ RowLevelUpdateSpec rowLevelUpdateSpec = (RowLevelUpdateSpec) sinkAbilitySpec;
+ return getPhysicalRowType(
+ schema, rowLevelUpdateSpec.getRequiredPhysicalColumnIndices());
+ } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
+ RowLevelDeleteSpec rowLevelDeleteSpec = (RowLevelDeleteSpec) sinkAbilitySpec;
+ return getPhysicalRowType(
+ schema, rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());
+ }
+ }
+ }
+ return (RowType) schema.toPhysicalRowDataType().getLogicalType();
+ }
+
+ @Override
+ protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) {
+ if (schema.getPrimaryKey().isPresent()) {
+ UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+ int[] primaryKeyIndices = new int[uniqueConstraint.getColumns().size()];
+ // SinkRowType may not contain full primary keys in case of row-level update or delete.
+ // In such case, return an empty array since the primary keys are not completed and
+ // we consider such case as no primary keys
+ // Note: this may happen if the required columns returned by
+ // connector don't fully contain the primary keys. But it's not recommended to only
+ // return partial primary keys
+ // For example, a table has primary keys: a, b, c; but the connector only return a, b
+ // in method SupportsRowLevelUpdate#applyRowLevelUpdate.
+ for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+ int fieldIndex = sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+ if (fieldIndex == -1) {
+ return new int[0];
+ }
+ primaryKeyIndices[i] = fieldIndex;
+ }
+ return primaryKeyIndices;
+ } else {
+ return new int[0];
+ }
+ }
+
+ /** Get the physical row type with given column indices. */
+ private RowType getPhysicalRowType(ResolvedSchema schema, int[] columnIndices) {
+ List<Column> columns = schema.getColumns();
+ List<Column> requireColumns = new ArrayList<>();
+ for (int columnIndex : columnIndices) {
+ requireColumns.add(columns.get(columnIndex));
+ }
+ return (RowType) ResolvedSchema.of(requireColumns).toPhysicalRowDataType().getLogicalType();
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 9cde92d..c3debde 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -644,13 +644,13 @@
return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
}
- private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) {
+ protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) {
return schema.getPrimaryKey()
.map(k -> k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
.orElse(new int[0]);
}
- private RowType getPhysicalRowType(ResolvedSchema schema) {
+ protected RowType getPhysicalRowType(ResolvedSchema schema) {
return (RowType) schema.toPhysicalRowDataType().getLogicalType();
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
index fa38811..2e9c914 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
@@ -75,6 +75,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
import static org.apache.flink.table.data.RowData.createFieldGetter;
@@ -331,6 +332,7 @@
protected final String dataId;
protected boolean isUpdate;
+ protected int[] requiredColumnIndices;
public SupportsRowLevelUpdateSink(
ObjectIdentifier tableIdentifier,
@@ -383,10 +385,15 @@
new UpdateDataSinkFunction(
dataId,
getPrimaryKeyFieldGetter(
- resolvedCatalogTable.getResolvedSchema()),
+ resolvedCatalogTable.getResolvedSchema(),
+ requiredColumnIndices),
getAllFieldGetter(
resolvedCatalogTable.getResolvedSchema()),
- updateMode))
+ getPartialFieldGetter(
+ resolvedCatalogTable.getResolvedSchema(),
+ requiredColumnIndices),
+ updateMode,
+ requiredColumnIndices))
.setParallelism(1);
}
};
@@ -428,6 +435,8 @@
requireColumnsForUpdate,
resolvedCatalogTable.getResolvedSchema());
}
+ requiredColumnIndices =
+ getRequiredColumnIndexes(resolvedCatalogTable, requiredCols);
return Optional.ofNullable(requiredCols);
}
@@ -450,6 +459,7 @@
private final List<String> requireColumnsForDelete;
private boolean isDelete;
+ protected int[] requiredColumnIndices;
public SupportsRowLevelModificationSink(
ObjectIdentifier tableIdentifier,
@@ -519,6 +529,10 @@
.addSink(
new DeleteDataSinkFunction(
dataId,
+ getPrimaryKeyFieldGetter(
+ resolvedCatalogTable
+ .getResolvedSchema(),
+ requiredColumnIndices),
getAllFieldGetter(
resolvedCatalogTable
.getResolvedSchema()),
@@ -568,6 +582,8 @@
requireColumnsForDelete,
resolvedCatalogTable.getResolvedSchema());
}
+ requiredColumnIndices =
+ getRequiredColumnIndexes(resolvedCatalogTable, requiredCols);
return Optional.ofNullable(requiredCols);
}
@@ -582,7 +598,8 @@
/** The sink for delete existing data. */
private static class DeleteDataSinkFunction extends RichSinkFunction<RowData> {
private final String dataId;
- private final RowData.FieldGetter[] fieldGetters;
+ private final RowData.FieldGetter[] primaryKeyFieldGetters;
+ private final RowData.FieldGetter[] allFieldGetters;
private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode;
private transient Collection<RowData> data;
@@ -590,10 +607,12 @@
DeleteDataSinkFunction(
String dataId,
- RowData.FieldGetter[] fieldGetters,
+ RowData.FieldGetter[] primaryKeyFieldGetters,
+ RowData.FieldGetter[] allFieldGetters,
SupportsRowLevelDelete.RowLevelDeleteMode deleteMode) {
this.dataId = dataId;
- this.fieldGetters = fieldGetters;
+ this.primaryKeyFieldGetters = primaryKeyFieldGetters;
+ this.allFieldGetters = allFieldGetters;
this.deleteMode = deleteMode;
}
@@ -620,7 +639,7 @@
String.format(
"The RowKind for the coming rows should be %s in delete mode %s.",
RowKind.DELETE, DELETE_MODE));
- data.removeIf(rowData -> equal(rowData, deletedRow, fieldGetters));
+ data.removeIf(rowData -> equal(rowData, deletedRow, primaryKeyFieldGetters));
}
private void consumeRemainingRows(RowData remainingRow) {
@@ -629,7 +648,12 @@
String.format(
"The RowKind for the coming rows should be %s in delete mode %s.",
RowKind.INSERT, DELETE_MODE));
- newData.add(copyRowData(remainingRow, fieldGetters));
+ // find the row that match the remaining row
+ for (RowData oldRow : data) {
+ if (equal(oldRow, remainingRow, primaryKeyFieldGetters)) {
+ newData.add(copyRowData(oldRow, allFieldGetters));
+ }
+ }
}
@Override
@@ -720,6 +744,24 @@
}
}
+ private static int[] getRequiredColumnIndexes(
+ ResolvedCatalogTable resolvedCatalogTable, @Nullable List<Column> columns) {
+ if (columns == null) {
+ return IntStream.range(0, resolvedCatalogTable.getResolvedSchema().getColumnCount())
+ .toArray();
+ } else {
+ List<Column> allColumns = resolvedCatalogTable.getResolvedSchema().getColumns();
+ int[] columnIndexes = new int[columns.size()];
+ for (int i = 0; i < columns.size(); i++) {
+ int colIndex = allColumns.indexOf(columns.get(i));
+ if (colIndex != -1) {
+ columnIndexes[i] = colIndex;
+ }
+ }
+ return columnIndexes;
+ }
+ }
+
/**
* Get a list of equal predicate from a list of filter, each contains [column, value]. Return
* Optional.empty() if it contains any non-equal predicate.
@@ -817,7 +859,9 @@
private final String dataId;
private final RowData.FieldGetter[] primaryKeyFieldGetters;
private final RowData.FieldGetter[] allFieldGetters;
+ private final RowData.FieldGetter[] requireColumnFieldGetters;
private final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode;
+ private final int[] requiredColumnIndexes;
private transient RowData[] oldRows;
private transient List<Tuple2<Integer, RowData>> updatedRows;
private transient List<RowData> allNewRows;
@@ -826,11 +870,15 @@
String dataId,
RowData.FieldGetter[] primaryKeyFieldGetters,
RowData.FieldGetter[] allFieldGetters,
- SupportsRowLevelUpdate.RowLevelUpdateMode updateMode) {
+ RowData.FieldGetter[] requireColumnFieldGetters,
+ SupportsRowLevelUpdate.RowLevelUpdateMode updateMode,
+ int[] requiredColumnIndexes) {
this.dataId = dataId;
this.primaryKeyFieldGetters = primaryKeyFieldGetters;
this.updateMode = updateMode;
this.allFieldGetters = allFieldGetters;
+ this.requireColumnFieldGetters = requireColumnFieldGetters;
+ this.requiredColumnIndexes = requiredColumnIndexes;
}
@Override
@@ -858,7 +906,8 @@
for (int i = 0; i < oldRows.length; i++) {
if (equal(oldRows[i], updatedRow, primaryKeyFieldGetters)) {
- updatedRows.add(new Tuple2<>(i, copyRowData(updatedRow, allFieldGetters)));
+ updatedRows.add(
+ new Tuple2<>(i, getUpdatedAfterRowDataWithAllFields(updatedRow)));
}
}
}
@@ -867,7 +916,25 @@
Preconditions.checkArgument(
rowData.getRowKind() == RowKind.INSERT,
"The RowKind for the updated rows should be " + RowKind.INSERT);
- allNewRows.add(copyRowData(rowData, allFieldGetters));
+ allNewRows.add(getUpdatedAfterRowDataWithAllFields(rowData));
+ }
+
+ private RowData getUpdatedAfterRowDataWithAllFields(RowData updateAfterRowData) {
+ GenericRowData newRowData = null;
+ // first find the old row to be updated and copy the old values
+ for (RowData oldRow : oldRows) {
+ if (equal(oldRow, updateAfterRowData, primaryKeyFieldGetters)) {
+ newRowData = copyRowData(oldRow, allFieldGetters);
+ }
+ }
+ Preconditions.checkNotNull(newRowData);
+ // then set the new value after updated
+ for (int i = 0; i < requiredColumnIndexes.length; i++) {
+ newRowData.setField(
+ requiredColumnIndexes[i],
+ requireColumnFieldGetters[i].getFieldOrNull(updateAfterRowData));
+ }
+ return newRowData;
}
@Override
@@ -904,15 +971,25 @@
"The scan context should contains the object identifier for row-level modification.");
}
- private static RowData.FieldGetter[] getPrimaryKeyFieldGetter(ResolvedSchema resolvedSchema) {
- int[] indexes = resolvedSchema.getPrimaryKeyIndexes();
- RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[indexes.length];
+ /**
+ * Get the an array of FieldGetter for the primary keys which are also in {@param
+ * requiredColumnIndices}.
+ */
+ private static RowData.FieldGetter[] getPrimaryKeyFieldGetter(
+ ResolvedSchema resolvedSchema, int[] requiredColumnIndices) {
+ List<RowData.FieldGetter> fieldGetters = new ArrayList<>();
+ int[] primaryKeyIndices = resolvedSchema.getPrimaryKeyIndexes();
List<DataType> dataTypes = resolvedSchema.getColumnDataTypes();
- for (int i = 0; i < fieldGetters.length; i++) {
- int colIndex = indexes[i];
- fieldGetters[i] = createFieldGetter(dataTypes.get(colIndex).getLogicalType(), colIndex);
+ for (final int primaryKeyIndex : primaryKeyIndices) {
+ // find the primaryKeyIndex in requiredColumnIndices
+ for (int i = 0; i < requiredColumnIndices.length; i++) {
+ if (requiredColumnIndices[i] == primaryKeyIndex) {
+ fieldGetters.add(
+ createFieldGetter(dataTypes.get(primaryKeyIndex).getLogicalType(), i));
+ }
+ }
}
- return fieldGetters;
+ return fieldGetters.toArray(new RowData.FieldGetter[0]);
}
private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema resolvedSchema) {
@@ -924,6 +1001,18 @@
return fieldGetters;
}
+ private static RowData.FieldGetter[] getPartialFieldGetter(
+ ResolvedSchema resolvedSchema, int[] partialColumIndexes) {
+ List<Column> columns = resolvedSchema.getColumns();
+ RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[partialColumIndexes.length];
+ for (int i = 0; i < fieldGetters.length; i++) {
+ fieldGetters[i] =
+ createFieldGetter(
+ columns.get(partialColumIndexes[i]).getDataType().getLogicalType(), i);
+ }
+ return fieldGetters;
+ }
+
private static boolean equal(
RowData value1, RowData value2, RowData.FieldGetter[] fieldGetters) {
for (RowData.FieldGetter fieldGetter : fieldGetters) {
@@ -935,7 +1024,7 @@
return true;
}
- private static RowData copyRowData(RowData rowData, RowData.FieldGetter[] fieldGetters) {
+ private static GenericRowData copyRowData(RowData rowData, RowData.FieldGetter[] fieldGetters) {
Object[] values = new Object[fieldGetters.length];
for (int i = 0; i < fieldGetters.length; i++) {
values[i] = fieldGetters[i].getFieldOrNull(rowData);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
index b918b7a..21e2907 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
@@ -95,7 +95,8 @@
String dataId = registerData();
tEnv().executeSql(
String.format(
- "CREATE TABLE t (a int, b string, c double) WITH"
+ "CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED,"
+ + " b string, c double) WITH"
+ " ('connector' = 'test-update-delete',"
+ " 'data-id' = '%s',"
+ " 'delete-mode' = '%s',"
@@ -112,12 +113,54 @@
}
@Test
+ public void testRowLevelDeleteWithPartitionColumn() throws Exception {
+ String dataId = registerData();
+ tEnv().executeSql(
+ String.format(
+ "CREATE TABLE t"
+ + " (a int PRIMARY KEY NOT ENFORCED,"
+ + " b string not null,"
+ + " c double not null) WITH"
+ + " ('connector' = 'test-update-delete',"
+ + " 'data-id' = '%s',"
+ + " 'delete-mode' = '%s',"
+ + " 'required-columns-for-delete' = 'a;c',"
+ + " 'support-delete-push-down' = 'false'"
+ + ")",
+ dataId, deleteMode));
+ tEnv().executeSql("DELETE FROM t WHERE a > 1").await();
+ List<Row> rows = toRows(tEnv().executeSql("SELECT * FROM t"));
+ assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 2.0]]");
+
+ // test delete with requiring partial primary keys
+ dataId = registerData();
+ tEnv().executeSql(
+ String.format(
+ "CREATE TABLE t1"
+ + " (a int,"
+ + " b string not null,"
+ + " c double not null,"
+ + " PRIMARY KEY (a, c) NOT ENFORCED) WITH"
+ + " ('connector' = 'test-update-delete',"
+ + " 'data-id' = '%s',"
+ + " 'delete-mode' = '%s',"
+ + " 'required-columns-for-delete' = 'a;b',"
+ + " 'support-delete-push-down' = 'false'"
+ + ")",
+ dataId, deleteMode));
+ tEnv().executeSql("DELETE FROM t1 WHERE a > 1").await();
+ rows = toRows(tEnv().executeSql("SELECT * FROM t1"));
+ assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 2.0]]");
+ }
+
+ @Test
public void testMixDelete() throws Exception {
// test mix delete push down and row-level delete
String dataId = registerData();
tEnv().executeSql(
String.format(
- "CREATE TABLE t (a int, b string, c double) WITH"
+ "CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED,"
+ + " b string, c double) WITH"
+ " ('connector' = 'test-update-delete',"
+ " 'data-id' = '%s',"
+ " 'mix-delete' = 'true')",
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
index 793a67e..b7f5501 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
@@ -86,6 +86,46 @@
}
@Test
+ public void testPartialUpdate() throws Exception {
+ String dataId = registerData();
+ tEnv().executeSql(
+ String.format(
+ "CREATE TABLE t ("
+ + " a int PRIMARY KEY NOT ENFORCED,"
+ + " b string not null,"
+ + " c double not null) WITH"
+ + " ('connector' = 'test-update-delete', "
+ + "'data-id' = '%s',"
+ + " 'required-columns-for-update' = 'a;b', "
+ + " 'update-mode' = '%s')",
+ dataId, updateMode));
+ tEnv().executeSql("UPDATE t SET b = 'uaa' WHERE a >= 1").await();
+ List<String> rows = toSortedResults(tEnv().executeSql("SELECT * FROM t"));
+ assertThat(rows.toString())
+ .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]");
+
+ // test partial update with requiring partial primary keys
+ dataId = registerData();
+ tEnv().executeSql(
+ String.format(
+ "CREATE TABLE t1 ("
+ + " a int,"
+ + " b string not null,"
+ + " c double not null,"
+ + " PRIMARY KEY (a, c) NOT ENFORCED"
+ + ") WITH"
+ + " ('connector' = 'test-update-delete', "
+ + "'data-id' = '%s',"
+ + " 'required-columns-for-update' = 'a;b', "
+ + " 'update-mode' = '%s')",
+ dataId, updateMode));
+ tEnv().executeSql("UPDATE t1 SET b = 'uaa' WHERE a >= 1").await();
+ rows = toSortedResults(tEnv().executeSql("SELECT * FROM t1"));
+ assertThat(rows.toString())
+ .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 4.0]]");
+ }
+
+ @Test
public void testStatementSetContainUpdateAndInsert() throws Exception {
tEnv().executeSql(
"CREATE TABLE t (a int, b string, c double) WITH"