[Improvement] SupportsFilterPushDown and ProjectionPushDown in DorisSource (#348)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
index d837395..06bbbc1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -33,8 +33,6 @@
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -86,8 +84,6 @@
         options.add(USERNAME);
         options.add(PASSWORD);
 
-        options.add(DORIS_READ_FIELD);
-        options.add(DORIS_FILTER_QUERY);
         options.add(DORIS_TABLET_SIZE);
         options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
         options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index fcf31c3..5ac139e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -605,7 +605,7 @@
         if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
             sql += " where " + readOptions.getFilterQuery();
         }
-        logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
+        logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
 
         HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN);
         String entity = "{\"sql\": \"" + sql + "\"}";
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index b91b04b..6d74bf8 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -80,18 +80,6 @@
                             "Use automatic redirection of fe without explicitly obtaining the be list");
 
     // source config options
-    public static final ConfigOption<String> DORIS_READ_FIELD =
-            ConfigOptions.key("doris.read.field")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "List of column names in the Doris table, separated by commas");
-    public static final ConfigOption<String> DORIS_FILTER_QUERY =
-            ConfigOptions.key("doris.filter.query")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
     public static final ConfigOption<Integer> DORIS_TABLET_SIZE =
             ConfigOptions.key("doris.request.tablet.size")
                     .intType()
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index f327b9c..a276696 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -45,8 +45,6 @@
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -118,8 +116,6 @@
         options.add(JDBC_URL);
         options.add(AUTO_REDIRECT);
 
-        options.add(DORIS_READ_FIELD);
-        options.add(DORIS_FILTER_QUERY);
         options.add(DORIS_TABLET_SIZE);
         options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
         options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
@@ -181,7 +177,8 @@
                 getDorisOptions(helper.getOptions()),
                 getDorisReadOptions(helper.getOptions()),
                 getDorisLookupOptions(helper.getOptions()),
-                physicalSchema);
+                physicalSchema,
+                context.getPhysicalRowDataType());
     }
 
     private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
@@ -205,8 +202,6 @@
         builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
                 .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
                 .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
-                .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
-                .setReadFields(readableConfig.get(DORIS_READ_FIELD))
                 .setRequestQueryTimeoutS(
                         (int) readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
                 .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 9057f1f..6b09735 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -21,6 +21,7 @@
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.InputFormatProvider;
@@ -31,8 +32,10 @@
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.StringUtils;
 
 import org.apache.doris.flink.cfg.DorisLookupOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
@@ -45,6 +48,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -58,23 +62,31 @@
  * SourceFunction} and its {@link DeserializationSchema} for runtime. Both instances are
  * parameterized to return internal data structures (i.e. {@link RowData}).
  */
-public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource {
+public final class DorisDynamicTableSource
+        implements ScanTableSource,
+                LookupTableSource,
+                SupportsFilterPushDown,
+                SupportsProjectionPushDown {
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSource.class);
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
     private DorisLookupOptions lookupOptions;
     private TableSchema physicalSchema;
+    private List<String> resolvedFilterQuery = new ArrayList<>();
+    private DataType physicalRowDataType;
 
     public DorisDynamicTableSource(
             DorisOptions options,
             DorisReadOptions readOptions,
             DorisLookupOptions lookupOptions,
-            TableSchema physicalSchema) {
+            TableSchema physicalSchema,
+            DataType physicalRowDataType) {
         this.options = options;
         this.lookupOptions = lookupOptions;
         this.readOptions = readOptions;
         this.physicalSchema = physicalSchema;
+        this.physicalRowDataType = physicalRowDataType;
     }
 
     public DorisDynamicTableSource(
@@ -93,8 +105,11 @@
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+        String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+        readOptions.setFilterQuery(filterQuery);
+        String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
         readOptions.setReadFields(
-                Arrays.stream(physicalSchema.getFieldNames())
+                Arrays.stream(selectFields)
                         .map(item -> String.format("`%s`", item.trim().replace("`", "")))
                         .collect(Collectors.joining(", ")));
 
@@ -114,7 +129,7 @@
                             .setTableIdentifier(options.getTableIdentifier())
                             .setPartitions(dorisPartitions)
                             .setReadOptions(readOptions)
-                            .setRowType((RowType) physicalSchema.toRowDataType().getLogicalType());
+                            .setRowType((RowType) physicalRowDataType.getLogicalType());
             return InputFormatProvider.of(builder.build());
         } else {
             // Read data using the interface of the FLIP-27 specification
@@ -124,10 +139,7 @@
                             .setDorisOptions(options)
                             .setDeserializer(
                                     new RowDataDeserializationSchema(
-                                            (RowType)
-                                                    physicalSchema
-                                                            .toRowDataType()
-                                                            .getLogicalType()))
+                                            (RowType) physicalRowDataType.getLogicalType()))
                             .build();
             return SourceProvider.of(build);
         }
@@ -135,7 +147,6 @@
 
     @Override
     public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
-        DataType physicalRowDataType = physicalSchema.toRowDataType();
         String[] keyNames = new String[context.getKeys().length];
         int[] keyIndexs = new int[context.getKeys().length];
         for (int i = 0; i < keyNames.length; i++) {
@@ -168,11 +179,43 @@
 
     @Override
     public DynamicTableSource copy() {
-        return new DorisDynamicTableSource(options, readOptions, lookupOptions, physicalSchema);
+        DorisDynamicTableSource newSource =
+                new DorisDynamicTableSource(
+                        options, readOptions, lookupOptions, physicalSchema, physicalRowDataType);
+        newSource.resolvedFilterQuery = new ArrayList<>(this.resolvedFilterQuery);
+        return newSource;
     }
 
     @Override
     public String asSummaryString() {
         return "Doris Table Source";
     }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+        List<ResolvedExpression> remainingFilters = new ArrayList<>();
+
+        DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor();
+        for (ResolvedExpression filter : filters) {
+            String filterQuery = filter.accept(expressionVisitor);
+            if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
+                acceptedFilters.add(filter);
+                this.resolvedFilterQuery.add(filterQuery);
+            } else {
+                remainingFilters.add(filter);
+            }
+        }
+        return Result.of(acceptedFilters, remainingFilters);
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        return false;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
+        this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);
+    }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
new file mode 100644
index 0000000..3f327fe
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.List;
+
+public class DorisExpressionVisitor implements ExpressionVisitor<String> {
+
+    @Override
+    public String visit(CallExpression call) {
+        if (BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            return combineExpression("=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            return combineExpression("<", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return combineExpression("<=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            return combineExpression(">", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition())) {
+            return combineExpression(">=", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            return combineExpression("<>", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            return combineExpression("OR", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            return combineExpression("AND", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.LIKE.equals(call.getFunctionDefinition())) {
+            return combineExpression("LIKE", call.getResolvedChildren());
+        }
+        if (BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
+            return combineLeftExpression("IS NULL", call.getResolvedChildren().get(0));
+        }
+        if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
+            return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0));
+        }
+        return null;
+    }
+
+    private String combineExpression(String operator, List<ResolvedExpression> operand) {
+        String left = operand.get(0).accept(this);
+        String right = operand.get(1).accept(this);
+        return String.format("(%s %s %s)", left, operator, right);
+    }
+
+    private String combineLeftExpression(String operator, ResolvedExpression operand) {
+        String left = operand.accept(this);
+        return String.format("(%s %s)", left, operator);
+    }
+
+    @Override
+    public String visit(ValueLiteralExpression valueLiteral) {
+        LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot();
+        if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+                || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+                || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
+                || typeRoot.equals(LogicalTypeRoot.DATE)) {
+            return "'" + valueLiteral + "'";
+        }
+        return valueLiteral.toString();
+    }
+
+    @Override
+    public String visit(FieldReferenceExpression fieldReference) {
+        return fieldReference.getName();
+    }
+
+    @Override
+    public String visit(TypeLiteralExpression typeLiteral) {
+        return typeLiteral.getOutputDataType().toString();
+    }
+
+    @Override
+    public String visit(Expression expression) {
+        return null;
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
index ecc08f2..290e193 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
@@ -26,6 +26,7 @@
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 
+import org.apache.doris.flink.cfg.DorisLookupOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.OptionUtils;
 import org.apache.doris.flink.source.DorisSource;
@@ -48,7 +49,9 @@
                 new DorisDynamicTableSource(
                         OptionUtils.buildDorisOptions(),
                         builder.build(),
-                        TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+                        DorisLookupOptions.builder().build(),
+                        TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA),
+                        FactoryMocks.PHYSICAL_DATA_TYPE);
         ScanTableSource.ScanRuntimeProvider provider =
                 actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         assertDorisSource(provider);
@@ -60,7 +63,9 @@
                 new DorisDynamicTableSource(
                         OptionUtils.buildDorisOptions(),
                         OptionUtils.buildDorisReadOptions(),
-                        TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+                        DorisLookupOptions.builder().build(),
+                        TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA),
+                        FactoryMocks.PHYSICAL_DATA_TYPE);
         ScanTableSource.ScanRuntimeProvider provider =
                 actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         assertDorisSource(provider);