[Improve]Compatible with previous filter query options (#373)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 63e550a..77349a6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -80,6 +80,24 @@
                             "Use automatic redirection of fe without explicitly obtaining the be list");
 
     // source config options
+    // This is compatible with the previous writing method.
+    // Some expressions may not be pushed down by FlinkSQL.
+    @Deprecated
+    public static final ConfigOption<String> DORIS_FILTER_QUERY =
+            ConfigOptions.key("doris.filter.query")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
+
+    @Deprecated
+    public static final ConfigOption<String> DORIS_READ_FIELD =
+            ConfigOptions.key("doris.read.field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "List of column names in the Doris table, separated by commas");
+
     public static final ConfigOption<Integer> DORIS_TABLET_SIZE =
             ConfigOptions.key("doris.request.tablet.size")
                     .intType()
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index a276696..b198ca3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -45,6 +45,8 @@
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -116,6 +118,8 @@
         options.add(JDBC_URL);
         options.add(AUTO_REDIRECT);
 
+        options.add(DORIS_READ_FIELD);
+        options.add(DORIS_FILTER_QUERY);
         options.add(DORIS_TABLET_SIZE);
         options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
         options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
@@ -202,6 +206,8 @@
         builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
                 .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
                 .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
+                .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
+                .setReadFields(readableConfig.get(DORIS_READ_FIELD))
                 .setRequestQueryTimeoutS(
                         (int) readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
                 .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index b6cd1a7..32851d4 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -95,13 +95,18 @@
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
-        String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
-        readOptions.setFilterQuery(filterQuery);
-        String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
-        readOptions.setReadFields(
-                Arrays.stream(selectFields)
-                        .map(item -> String.format("`%s`", item.trim().replace("`", "")))
-                        .collect(Collectors.joining(", ")));
+        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+            String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+            readOptions.setFilterQuery(filterQuery);
+        }
+        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
+            String[] selectFields =
+                    DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
+            readOptions.setReadFields(
+                    Arrays.stream(selectFields)
+                            .map(item -> String.format("`%s`", item.trim().replace("`", "")))
+                            .collect(Collectors.joining(", ")));
+        }
 
         if (readOptions.getUseOldApi()) {
             List<PartitionDefinition> dorisPartitions;