[Improve]Compatible with previous filter query options (#373)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 63e550a..77349a6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -80,6 +80,24 @@
"Use automatic redirection of fe without explicitly obtaining the be list");
// source config options
+ // This is compatible with the previous writing method.
+ // Some expressions may not be pushed down by FlinkSQL.
+ @Deprecated
+ public static final ConfigOption<String> DORIS_FILTER_QUERY =
+ ConfigOptions.key("doris.filter.query")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
+
+ @Deprecated
+ public static final ConfigOption<String> DORIS_READ_FIELD =
+ ConfigOptions.key("doris.read.field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "List of column names in the Doris table, separated by commas");
+
public static final ConfigOption<Integer> DORIS_TABLET_SIZE =
ConfigOptions.key("doris.request.tablet.size")
.intType()
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index a276696..b198ca3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -45,6 +45,8 @@
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -116,6 +118,8 @@
options.add(JDBC_URL);
options.add(AUTO_REDIRECT);
+ options.add(DORIS_READ_FIELD);
+ options.add(DORIS_FILTER_QUERY);
options.add(DORIS_TABLET_SIZE);
options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
@@ -202,6 +206,8 @@
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
+ .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
+ .setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(
(int) readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index b6cd1a7..32851d4 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -95,13 +95,18 @@
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
- String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
- readOptions.setFilterQuery(filterQuery);
- String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
- readOptions.setReadFields(
- Arrays.stream(selectFields)
- .map(item -> String.format("`%s`", item.trim().replace("`", "")))
- .collect(Collectors.joining(", ")));
+ if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+ String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+ readOptions.setFilterQuery(filterQuery);
+ }
+ if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
+ String[] selectFields =
+ DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
+ readOptions.setReadFields(
+ Arrays.stream(selectFields)
+ .map(item -> String.format("`%s`", item.trim().replace("`", "")))
+ .collect(Collectors.joining(", ")));
+ }
if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;