[Improvement] SupportsFilterPushDown and ProjectionPushDown in DorisSource (#348)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
index d837395..06bbbc1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -33,8 +33,6 @@
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -86,8 +84,6 @@
options.add(USERNAME);
options.add(PASSWORD);
- options.add(DORIS_READ_FIELD);
- options.add(DORIS_FILTER_QUERY);
options.add(DORIS_TABLET_SIZE);
options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index fcf31c3..5ac139e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -605,7 +605,7 @@
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " where " + readOptions.getFilterQuery();
}
- logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
+ logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN);
String entity = "{\"sql\": \"" + sql + "\"}";
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index b91b04b..6d74bf8 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -80,18 +80,6 @@
"Use automatic redirection of fe without explicitly obtaining the be list");
// source config options
- public static final ConfigOption<String> DORIS_READ_FIELD =
- ConfigOptions.key("doris.read.field")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "List of column names in the Doris table, separated by commas");
- public static final ConfigOption<String> DORIS_FILTER_QUERY =
- ConfigOptions.key("doris.filter.query")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
public static final ConfigOption<Integer> DORIS_TABLET_SIZE =
ConfigOptions.key("doris.request.tablet.size")
.intType()
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index f327b9c..a276696 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -45,8 +45,6 @@
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -118,8 +116,6 @@
options.add(JDBC_URL);
options.add(AUTO_REDIRECT);
- options.add(DORIS_READ_FIELD);
- options.add(DORIS_FILTER_QUERY);
options.add(DORIS_TABLET_SIZE);
options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
@@ -181,7 +177,8 @@
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisLookupOptions(helper.getOptions()),
- physicalSchema);
+ physicalSchema,
+ context.getPhysicalRowDataType());
}
private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
@@ -205,8 +202,6 @@
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
- .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
- .setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(
(int) readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 9057f1f..6b09735 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
@@ -31,8 +32,10 @@
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.StringUtils;
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
@@ -45,6 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -58,23 +62,31 @@
* SourceFunction} and its {@link DeserializationSchema} for runtime. Both instances are
* parameterized to return internal data structures (i.e. {@link RowData}).
*/
-public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource {
+public final class DorisDynamicTableSource
+ implements ScanTableSource,
+ LookupTableSource,
+ SupportsFilterPushDown,
+ SupportsProjectionPushDown {
private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSource.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
private DorisLookupOptions lookupOptions;
private TableSchema physicalSchema;
+ private List<String> resolvedFilterQuery = new ArrayList<>();
+ private DataType physicalRowDataType;
public DorisDynamicTableSource(
DorisOptions options,
DorisReadOptions readOptions,
DorisLookupOptions lookupOptions,
- TableSchema physicalSchema) {
+ TableSchema physicalSchema,
+ DataType physicalRowDataType) {
this.options = options;
this.lookupOptions = lookupOptions;
this.readOptions = readOptions;
this.physicalSchema = physicalSchema;
+ this.physicalRowDataType = physicalRowDataType;
}
public DorisDynamicTableSource(
@@ -93,8 +105,11 @@
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+ readOptions.setFilterQuery(filterQuery);
+ String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
readOptions.setReadFields(
- Arrays.stream(physicalSchema.getFieldNames())
+ Arrays.stream(selectFields)
.map(item -> String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));
@@ -114,7 +129,7 @@
.setTableIdentifier(options.getTableIdentifier())
.setPartitions(dorisPartitions)
.setReadOptions(readOptions)
- .setRowType((RowType) physicalSchema.toRowDataType().getLogicalType());
+ .setRowType((RowType) physicalRowDataType.getLogicalType());
return InputFormatProvider.of(builder.build());
} else {
// Read data using the interface of the FLIP-27 specification
@@ -124,10 +139,7 @@
.setDorisOptions(options)
.setDeserializer(
new RowDataDeserializationSchema(
- (RowType)
- physicalSchema
- .toRowDataType()
- .getLogicalType()))
+ (RowType) physicalRowDataType.getLogicalType()))
.build();
return SourceProvider.of(build);
}
@@ -135,7 +147,6 @@
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
- DataType physicalRowDataType = physicalSchema.toRowDataType();
String[] keyNames = new String[context.getKeys().length];
int[] keyIndexs = new int[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
@@ -168,11 +179,43 @@
@Override
public DynamicTableSource copy() {
- return new DorisDynamicTableSource(options, readOptions, lookupOptions, physicalSchema);
+ DorisDynamicTableSource newSource =
+ new DorisDynamicTableSource(
+ options, readOptions, lookupOptions, physicalSchema, physicalRowDataType);
+ newSource.resolvedFilterQuery = new ArrayList<>(this.resolvedFilterQuery);
+ return newSource;
}
@Override
public String asSummaryString() {
return "Doris Table Source";
}
+
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+ List<ResolvedExpression> remainingFilters = new ArrayList<>();
+
+ DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor();
+ for (ResolvedExpression filter : filters) {
+ String filterQuery = filter.accept(expressionVisitor);
+ if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
+ acceptedFilters.add(filter);
+ this.resolvedFilterQuery.add(filterQuery);
+ } else {
+ remainingFilters.add(filter);
+ }
+ }
+ return Result.of(acceptedFilters, remainingFilters);
+ }
+
+ @Override
+ public boolean supportsNestedProjection() {
+ return false;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields, DataType producedDataType) {
+ this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);
+ }
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
new file mode 100644
index 0000000..3f327fe
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.List;
+
+public class DorisExpressionVisitor implements ExpressionVisitor<String> {
+
+ @Override
+ public String visit(CallExpression call) {
+ if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+ return combineExpression("=", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+ return combineExpression("<", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+ return combineExpression("<=", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+ return combineExpression(">", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+ return combineExpression(">=", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+ return combineExpression("<>", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+ return combineExpression("OR", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+ return combineExpression("AND", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.LIKE.equals(call.getFunctionDefinition())) {
+ return combineExpression("LIKE", call.getResolvedChildren());
+ }
+ if (BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
+ return combineLeftExpression("IS NULL", call.getResolvedChildren().get(0));
+ }
+ if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
+ return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0));
+ }
+ return null;
+ }
+
+ private String combineExpression(String operator, List<ResolvedExpression> operand) {
+ String left = operand.get(0).accept(this);
+ String right = operand.get(1).accept(this);
+ return String.format("(%s %s %s)", left, operator, right);
+ }
+
+ private String combineLeftExpression(String operator, ResolvedExpression operand) {
+ String left = operand.accept(this);
+ return String.format("(%s %s)", left, operator);
+ }
+
+ @Override
+ public String visit(ValueLiteralExpression valueLiteral) {
+ LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot();
+ if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+ || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
+ || typeRoot.equals(LogicalTypeRoot.DATE)) {
+ return "'" + valueLiteral + "'";
+ }
+ return valueLiteral.toString();
+ }
+
+ @Override
+ public String visit(FieldReferenceExpression fieldReference) {
+ return fieldReference.getName();
+ }
+
+ @Override
+ public String visit(TypeLiteralExpression typeLiteral) {
+ return typeLiteral.getOutputDataType().toString();
+ }
+
+ @Override
+ public String visit(Expression expression) {
+ return null;
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
index ecc08f2..290e193 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
@@ -26,6 +26,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.doris.flink.source.DorisSource;
@@ -48,7 +49,9 @@
new DorisDynamicTableSource(
OptionUtils.buildDorisOptions(),
builder.build(),
- TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+ DorisLookupOptions.builder().build(),
+ TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA),
+ FactoryMocks.PHYSICAL_DATA_TYPE);
ScanTableSource.ScanRuntimeProvider provider =
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertDorisSource(provider);
@@ -60,7 +63,9 @@
new DorisDynamicTableSource(
OptionUtils.buildDorisOptions(),
OptionUtils.buildDorisReadOptions(),
- TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+ DorisLookupOptions.builder().build(),
+ TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA),
+ FactoryMocks.PHYSICAL_DATA_TYPE);
ScanTableSource.ScanRuntimeProvider provider =
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertDorisSource(provider);