blob: 90aeb61c031de7e5aeee0a5c24f2fc943fd9c21d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.operations;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.utils.PlannerMocks;
import org.apache.flink.table.planner.utils.TimestampStringUtils;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.util.TimestampString;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link DeletePushDownUtils}. */
public class DeletePushDownUtilsTest {
private final TableConfig tableConfig = TableConfig.getDefault();
private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
private final CatalogManager catalogManager =
CatalogManagerMocks.preparedCatalogManager()
.defaultCatalog("builtin", catalog)
.config(
Configuration.fromMap(
Collections.singletonMap(
ExecutionOptions.RUNTIME_MODE.key(),
RuntimeExecutionMode.BATCH.name())))
.build();
private final PlannerMocks plannerMocks =
PlannerMocks.newBuilder()
.withBatchMode(true)
.withTableConfig(tableConfig)
.withCatalogManager(catalogManager)
.withRootSchema(
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)))
.build();
private final PlannerContext plannerContext = plannerMocks.getPlannerContext();
private final CalciteParser parser = plannerContext.createCalciteParser();
private final FlinkPlannerImpl flinkPlanner = plannerContext.createFlinkPlanner();
@Test
public void testGetDynamicTableSink() {
// create a table with connector = test-update-delete
Map<String, String> options = new HashMap<>();
options.put("connector", "test-update-delete");
CatalogTable catalogTable = createTestCatalogTable(options);
ObjectIdentifier tableId = ObjectIdentifier.of("builtin", "default", "t");
catalogManager.createTable(catalogTable, tableId, false);
ContextResolvedTable resolvedTable =
ContextResolvedTable.permanent(
tableId, catalog, catalogManager.resolveCatalogTable(catalogTable));
LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t");
Optional<DynamicTableSink> optionalDynamicTableSink =
DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager);
// verify we can get the dynamic table sink
assertThat(optionalDynamicTableSink).isPresent();
assertThat(optionalDynamicTableSink.get())
.isInstanceOf(TestUpdateDeleteTableFactory.SupportsDeletePushDownSink.class);
// create table with connector = COLLECTION, it's legacy table sink
options.put("connector", "COLLECTION");
catalogTable = createTestCatalogTable(options);
tableId = ObjectIdentifier.of("builtin", "default", "t1");
catalogManager.createTable(catalogTable, tableId, false);
resolvedTable =
ContextResolvedTable.permanent(
tableId, catalog, catalogManager.resolveCatalogTable(catalogTable));
tableModify = getTableModifyFromSql("DELETE FROM t1");
optionalDynamicTableSink =
DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager);
// verify it should be empty since it's not an instance of DynamicTableSink but is legacy
// TableSink
assertThat(optionalDynamicTableSink).isEmpty();
}
@Test
public void testGetResolveFilterExpressions() {
CatalogTable catalogTable =
CatalogTable.of(
Schema.newBuilder()
.column("f0", DataTypes.INT().notNull())
.column("f1", DataTypes.STRING().nullable())
.column("f2", DataTypes.BIGINT().nullable())
.column("f3", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().nullable())
.build(),
null,
Collections.emptyList(),
Collections.emptyMap());
catalogManager.createTable(
catalogTable, ObjectIdentifier.of("builtin", "default", "t"), false);
// verify there's no where clause
LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t");
Optional<List<ResolvedExpression>> optionalResolvedExpressions =
DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
verifyExpression(optionalResolvedExpressions, "[]");
tableModify = getTableModifyFromSql("DELETE FROM t where f0 = 1 and f1 = '123'");
optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
verifyExpression(optionalResolvedExpressions, "[equals(f0, 1), equals(f1, '123')]");
tableModify = getTableModifyFromSql("DELETE FROM t where f0 = 1 + 6 and f0 < 6");
optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
assertThat(optionalResolvedExpressions).isPresent();
verifyExpression(optionalResolvedExpressions, "[false]");
tableModify = getTableModifyFromSql("DELETE FROM t where f0 = f2 + 1");
optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
verifyExpression(
optionalResolvedExpressions, "[equals(cast(f0, BIGINT NOT NULL), plus(f2, 1))]");
// resolve filters is not available as it contains sub-query
tableModify = getTableModifyFromSql("DELETE FROM t where f0 > (select count(1) from t)");
optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
assertThat(optionalResolvedExpressions).isEmpty();
String dateTime = "2024-05-13 08:00:00";
tableModify =
getTableModifyFromSql(String.format("DELETE FROM t where f3 > '%s'", dateTime));
LocalDateTime ldt = TimestampStringUtils.toLocalDateTime(new TimestampString(dateTime));
Instant instant = ldt.toInstant(ZoneId.systemDefault().getRules().getOffset(ldt));
optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
assertThat(optionalResolvedExpressions).isPresent();
verifyExpression(
optionalResolvedExpressions,
String.format("[greaterThan(f3, %s)]", instant.toString()));
}
private CatalogTable createTestCatalogTable(Map<String, String> options) {
return CatalogTable.of(
Schema.newBuilder()
.column("f0", DataTypes.INT().notNull())
.column("f1", DataTypes.STRING().nullable())
.column("f2", DataTypes.BIGINT().nullable())
.build(),
null,
Collections.emptyList(),
options);
}
private LogicalTableModify getTableModifyFromSql(String sql) {
SqlNode sqlNode = parser.parse(sql);
final SqlNode validated = flinkPlanner.validate(sqlNode);
RelRoot deleteRelational = flinkPlanner.rel(validated);
return (LogicalTableModify) deleteRelational.rel;
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private void verifyExpression(
Optional<List<ResolvedExpression>> optionalResolvedExpressions, String expected) {
assertThat(optionalResolvedExpressions).isPresent();
assertThat(optionalResolvedExpressions.get().toString()).isEqualTo(expected);
}
}