[FLINK-27384][hive] Fix the modified partitions are missed in temporal table with create-time mode
This closes #20376.
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
index 41e7718..25a7471 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
@@ -46,7 +46,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.stream.Collectors;
import static org.apache.flink.connector.file.table.DefaultPartTimeExtractor.toMills;
import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_CLASS;
@@ -75,8 +74,6 @@
protected transient Path tableLocation;
private transient PartitionTimeExtractor extractor;
private transient Table table;
- // remember the map from partition to its create time
- private transient Map<List<String>, Long> partValuesToCreateTime;
public HivePartitionFetcherContextBase(
ObjectPath tablePath,
@@ -119,7 +116,6 @@
extractorPattern,
formatterPattern);
tableLocation = new Path(table.getSd().getLocation());
- partValuesToCreateTime = new HashMap<>();
}
@Override
@@ -137,22 +133,18 @@
}
break;
case CREATE_TIME:
+ Map<List<String>, Long> partValuesToCreateTime = new HashMap<>();
partitionNames =
metaStoreClient.listPartitionNames(
tablePath.getDatabaseName(),
tablePath.getObjectName(),
Short.MAX_VALUE);
- List<String> newNames =
- partitionNames.stream()
- .filter(
- n ->
- !partValuesToCreateTime.containsKey(
- extractPartitionValues(n)))
- .collect(Collectors.toList());
- List<Partition> newPartitions =
+ List<Partition> partitions =
metaStoreClient.getPartitionsByNames(
- tablePath.getDatabaseName(), tablePath.getObjectName(), newNames);
- for (Partition partition : newPartitions) {
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ partitionNames);
+ for (Partition partition : partitions) {
partValuesToCreateTime.put(
partition.getValues(), getPartitionCreateTime(partition));
}
@@ -233,9 +225,6 @@
@Override
public void close() throws Exception {
- if (partValuesToCreateTime != null) {
- partValuesToCreateTime.clear();
- }
if (this.metaStoreClient != null) {
this.metaStoreClient.close();
}