[hotfix][connector-hive] Avoid serializing TableConfig
Use and pass around only `threadNum` which is the only option read,
instead of the whole `TableConfig`, to prevent the relevant classes
that are serialized from trying to serialize also the `TableConfig`.
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
index 983faac..a061b27 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
@@ -23,7 +23,6 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
@@ -78,7 +77,7 @@
Collection<List<String>> seenPartitionsSinceOffset,
FileSplitAssigner splitAssigner,
long discoveryInterval,
- ReadableConfig flinkConf,
+ int threadNum,
JobConf jobConf,
ObjectPath tablePath,
ContinuousPartitionFetcher<Partition, T> fetcher,
@@ -95,7 +94,7 @@
currentReadOffset,
seenPartitionsSinceOffset,
tablePath,
- flinkConf,
+ threadNum,
jobConf,
fetcher,
fetcherContext);
@@ -188,7 +187,7 @@
private final Set<List<String>> seenPartitionsSinceOffset;
private final ObjectPath tablePath;
- private final ReadableConfig flinkConf;
+ private final int threadNum;
private final JobConf jobConf;
private final ContinuousPartitionFetcher<Partition, T> fetcher;
private final HiveContinuousPartitionContext<Partition, T> fetcherContext;
@@ -197,14 +196,14 @@
T currentReadOffset,
Collection<List<String>> seenPartitionsSinceOffset,
ObjectPath tablePath,
- ReadableConfig flinkConf,
+ int threadNum,
JobConf jobConf,
ContinuousPartitionFetcher<Partition, T> fetcher,
HiveContinuousPartitionContext<Partition, T> fetcherContext) {
this.currentReadOffset = currentReadOffset;
this.seenPartitionsSinceOffset = new HashSet<>(seenPartitionsSinceOffset);
this.tablePath = tablePath;
- this.flinkConf = flinkConf;
+ this.threadNum = threadNum;
this.jobConf = jobConf;
this.fetcher = fetcher;
this.fetcherContext = fetcherContext;
@@ -244,7 +243,7 @@
0,
Collections.singletonList(
fetcherContext.toHiveTablePartition(partition)),
- flinkConf,
+ threadNum,
jobConf));
}
}
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
index 488ea1b..e68072a 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
@@ -242,7 +242,7 @@
PartitionReader<HiveTablePartition, RowData> partitionReader =
new HiveInputFormatPartitionReader(
- flinkConf,
+ flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM),
jobConf,
hiveVersion,
tablePath,
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
index 26c4ac5..d7f5209 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
@@ -22,7 +22,6 @@
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.AbstractFileSource;
import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
@@ -60,7 +59,7 @@
private static final long serialVersionUID = 1L;
- private final ReadableConfig flinkConf;
+ private final int threadNum;
private final JobConfWrapper jobConfWrapper;
private final List<String> partitionKeys;
private final ContinuousPartitionFetcher<Partition, ?> fetcher;
@@ -73,7 +72,7 @@
FileSplitAssigner.Provider splitAssigner,
BulkFormat<T, HiveSourceSplit> readerFormat,
@Nullable ContinuousEnumerationSettings continuousEnumerationSettings,
- ReadableConfig flinkConf,
+ int threadNum,
JobConf jobConf,
ObjectPath tablePath,
List<String> partitionKeys,
@@ -86,10 +85,10 @@
readerFormat,
continuousEnumerationSettings);
Preconditions.checkArgument(
- flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM) >= 1,
+ threadNum >= 1,
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key()
+ " cannot be less than 1");
- this.flinkConf = flinkConf;
+ this.threadNum = threadNum;
this.jobConfWrapper = new JobConfWrapper(jobConf);
this.tablePath = tablePath;
this.partitionKeys = partitionKeys;
@@ -164,7 +163,7 @@
seenPartitions,
getAssignerFactory().create(new ArrayList<>(splits)),
getContinuousEnumerationSettings().getDiscoveryInterval().toMillis(),
- flinkConf,
+ threadNum,
jobConfWrapper.conf(),
tablePath,
fetcher,
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 25f7ed7..75b6e74 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -68,6 +68,7 @@
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
import static org.apache.flink.table.catalog.hive.util.HiveTableUtil.checkAcidTable;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -78,7 +79,8 @@
private static final Duration DEFAULT_SCAN_MONITOR_INTERVAL = Duration.ofMinutes(1L);
private final JobConf jobConf;
- private final ReadableConfig flinkConf;
+ private final int threadNum;
+ private final boolean fallbackMappedReader;
private final ObjectPath tablePath;
private final Map<String, String> tableOptions;
@@ -110,7 +112,9 @@
@Nonnull String tableName,
@Nonnull Map<String, String> tableOptions) {
this.jobConf = jobConf;
- this.flinkConf = flinkConf;
+ this.threadNum =
+ flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+ this.fallbackMappedReader = flinkConf.get(TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER);
this.tablePath = new ObjectPath(dbName, tableName);
this.hiveVersion = hiveVersion == null ? HiveShimLoader.getHiveVersion() : hiveVersion;
HiveConf hiveConf = HiveConfUtils.create(jobConf);
@@ -147,7 +151,9 @@
@Nullable String hiveVersion,
@Nonnull CatalogTable catalogTable) {
this.jobConf = jobConf;
- this.flinkConf = flinkConf;
+ this.threadNum =
+ flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+ this.fallbackMappedReader = flinkConf.get(TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER);
this.tablePath = tablePath;
this.hiveVersion = hiveVersion == null ? HiveShimLoader.getHiveVersion() : hiveVersion;
this.fullSchema = catalogTable.getSchema();
@@ -231,12 +237,12 @@
new Path[1],
new HiveSourceFileEnumerator.Provider(
partitions != null ? partitions : Collections.emptyList(),
- flinkConf,
+ threadNum,
new JobConfWrapper(jobConf)),
splitAssigner,
bulkFormat,
continuousSourceSettings,
- flinkConf,
+ threadNum,
jobConf,
tablePath,
partitionKeys,
@@ -318,7 +324,7 @@
fullSchema.getFieldDataTypes(),
hiveVersion,
getProducedRowType(),
- flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)),
+ fallbackMappedReader),
limit);
}
}
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
index 4227c36..dd0fe98 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
@@ -18,7 +18,6 @@
package org.apache.flink.connectors.hive;
-import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
@@ -45,33 +44,27 @@
// For non-partition hive table, partitions only contains one partition which partitionValues is
// empty.
private final List<HiveTablePartition> partitions;
- private final ReadableConfig flinkConf;
+ private final int threadNum;
private final JobConf jobConf;
public HiveSourceFileEnumerator(
- List<HiveTablePartition> partitions, ReadableConfig flinkConf, JobConf jobConf) {
+ List<HiveTablePartition> partitions, int threadNum, JobConf jobConf) {
this.partitions = partitions;
- this.flinkConf = flinkConf;
+ this.threadNum = threadNum;
this.jobConf = jobConf;
}
@Override
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
throws IOException {
- return new ArrayList<>(createInputSplits(minDesiredSplits, partitions, flinkConf, jobConf));
+ return new ArrayList<>(createInputSplits(minDesiredSplits, partitions, threadNum, jobConf));
}
public static List<HiveSourceSplit> createInputSplits(
- int minNumSplits,
- List<HiveTablePartition> partitions,
- ReadableConfig flinkConf,
- JobConf jobConf)
+ int minNumSplits, List<HiveTablePartition> partitions, int threadNum, JobConf jobConf)
throws IOException {
List<HiveSourceSplit> hiveSplits = new ArrayList<>();
- try (MRSplitsGetter splitsGetter =
- new MRSplitsGetter(
- flinkConf.get(
- HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM))) {
+ try (MRSplitsGetter splitsGetter = new MRSplitsGetter(threadNum)) {
for (HiveTablePartitionSplits partitionSplits :
splitsGetter.getHiveTablePartitionMRSplits(minNumSplits, partitions, jobConf)) {
HiveTablePartition partition = partitionSplits.getHiveTablePartition();
@@ -109,21 +102,19 @@
private static final long serialVersionUID = 1L;
private final List<HiveTablePartition> partitions;
- private final ReadableConfig flinkConf;
+ private final int threadNum;
private final JobConfWrapper jobConfWrapper;
public Provider(
- List<HiveTablePartition> partitions,
- ReadableConfig flinkConf,
- JobConfWrapper jobConfWrapper) {
+ List<HiveTablePartition> partitions, int threadNum, JobConfWrapper jobConfWrapper) {
this.partitions = partitions;
- this.flinkConf = flinkConf;
+ this.threadNum = threadNum;
this.jobConfWrapper = jobConfWrapper;
}
@Override
public FileEnumerator create() {
- return new HiveSourceFileEnumerator(partitions, flinkConf, jobConfWrapper.conf());
+ return new HiveSourceFileEnumerator(partitions, threadNum, jobConfWrapper.conf());
}
}
}
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 710c890..4316670 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -108,7 +108,8 @@
private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);
- private final ReadableConfig flinkConf;
+ private final boolean fallbackMappedReader;
+ private final boolean fallbackMappedWriter;
private final JobConf jobConf;
private final CatalogTable catalogTable;
private final ObjectIdentifier identifier;
@@ -128,7 +129,24 @@
ObjectIdentifier identifier,
CatalogTable table,
@Nullable Integer configuredParallelism) {
- this.flinkConf = flinkConf;
+ this(
+ flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
+ flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
+ jobConf,
+ identifier,
+ table,
+ configuredParallelism);
+ }
+
+ private HiveTableSink(
+ boolean fallbackMappedReader,
+ boolean fallbackMappedWriter,
+ JobConf jobConf,
+ ObjectIdentifier identifier,
+ CatalogTable table,
+ @Nullable Integer configuredParallelism) {
+ this.fallbackMappedReader = fallbackMappedReader;
+ this.fallbackMappedWriter = fallbackMappedWriter;
this.jobConf = jobConf;
this.identifier = identifier;
this.catalogTable = table;
@@ -298,7 +316,7 @@
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
- if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
+ if (fallbackMappedWriter) {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
@@ -377,7 +395,7 @@
catalogTable,
hiveVersion,
(RowType) tableSchema.toRowDataType().getLogicalType(),
- flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+ fallbackMappedReader);
}
private HiveTableMetaStoreFactory msFactory() {
@@ -487,7 +505,12 @@
public DynamicTableSink copy() {
HiveTableSink sink =
new HiveTableSink(
- flinkConf, jobConf, identifier, catalogTable, configuredParallelism);
+ fallbackMappedReader,
+ fallbackMappedWriter,
+ jobConf,
+ identifier,
+ catalogTable,
+ configuredParallelism);
sink.staticPartitionSpec = staticPartitionSpec;
sink.overwrite = overwrite;
sink.dynamicGrouping = dynamicGrouping;
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index d7ee578..cc5d639 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -146,6 +146,8 @@
catalogTable.getPartitionKeys(),
remainingPartitions);
+ int threadNum =
+ flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
int parallelism =
new HiveParallelismInference(tablePath, flinkConf)
.infer(
@@ -156,7 +158,7 @@
HiveSourceFileEnumerator.createInputSplits(
0,
hivePartitionsToRead,
- flinkConf,
+ threadNum,
jobConf)
.size())
.limit(limit);
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java
index 534c623..2493942 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java
@@ -18,7 +18,6 @@
package org.apache.flink.connectors.hive.read;
-import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.table.PartitionReader;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.JobConfWrapper;
@@ -36,7 +35,7 @@
implements PartitionReader<HiveTablePartition, RowData> {
private static final long serialVersionUID = 1L;
- private final ReadableConfig flinkConf;
+ private final int threadNum;
private final JobConfWrapper jobConfWrapper;
private final String hiveVersion;
protected final ObjectPath tablePath;
@@ -51,7 +50,7 @@
private transient int readingSplitId;
public HiveInputFormatPartitionReader(
- ReadableConfig flinkConf,
+ int threadNum,
JobConf jobConf,
String hiveVersion,
ObjectPath tablePath,
@@ -60,7 +59,7 @@
List<String> partitionKeys,
int[] selectedFields,
boolean useMapRedReader) {
- this.flinkConf = flinkConf;
+ this.threadNum = threadNum;
this.jobConfWrapper = new JobConfWrapper(jobConf);
this.hiveVersion = hiveVersion;
this.tablePath = tablePath;
@@ -75,7 +74,7 @@
public void open(List<HiveTablePartition> partitions) throws IOException {
hiveTableInputFormat =
new HiveTableInputFormat(
- this.flinkConf,
+ this.threadNum,
this.jobConfWrapper.conf(),
this.partitionKeys,
this.fieldTypes,
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
index 5297812..eadd114 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
@@ -23,8 +23,6 @@
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.HiveTablePartitionSplits;
import org.apache.flink.connectors.hive.JobConfWrapper;
@@ -69,7 +67,7 @@
private static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns";
private static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types";
- private final ReadableConfig flinkConf;
+ private final int threadNum;
private final JobConfWrapper jobConf;
@@ -98,7 +96,7 @@
@VisibleForTesting protected transient SplitReader reader;
public HiveTableInputFormat(
- ReadableConfig flinkConf,
+ int threadNum,
JobConf jobConf,
List<String> partitionKeys,
DataType[] fieldTypes,
@@ -109,7 +107,7 @@
boolean useMapRedReader,
List<HiveTablePartition> partitions) {
super(jobConf.getCredentials());
- this.flinkConf = flinkConf;
+ this.threadNum = threadNum;
this.jobConf = new JobConfWrapper(new JobConf(jobConf));
this.partitionKeys = partitionKeys;
this.fieldTypes = fieldTypes;
@@ -317,21 +315,15 @@
@Override
public HiveTableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
- return createInputSplits(minNumSplits, partitions, flinkConf, jobConf.conf());
+ return createInputSplits(minNumSplits, partitions, threadNum, jobConf.conf());
}
public static HiveTableInputSplit[] createInputSplits(
- int minNumSplits,
- List<HiveTablePartition> partitions,
- ReadableConfig flinkConf,
- JobConf jobConf)
+ int minNumSplits, List<HiveTablePartition> partitions, int threadNum, JobConf jobConf)
throws IOException {
List<HiveTableInputSplit> hiveSplits = new ArrayList<>();
int splitNum = 0;
- try (MRSplitsGetter splitsGetter =
- new MRSplitsGetter(
- flinkConf.get(
- HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM))) {
+ try (MRSplitsGetter splitsGetter = new MRSplitsGetter(threadNum)) {
for (HiveTablePartitionSplits partitionSplits :
splitsGetter.getHiveTablePartitionMRSplits(minNumSplits, partitions, jobConf)) {
for (InputSplit inputSplit : partitionSplits.getInputSplits()) {
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
index f366f10..0816092 100644
--- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
+++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
@@ -182,7 +182,8 @@
0L,
seenPartitionsSinceOffset,
tablePath,
- configuration,
+ configuration.get(
+ HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM),
jobConf,
continuousPartitionFetcher,
fetcherContext);
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java
index bc8b971..aac9007 100644
--- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java
+++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java
@@ -17,7 +17,7 @@
package org.apache.flink.connectors.hive.read;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
@@ -62,7 +62,7 @@
// create partition reader
HiveInputFormatPartitionReader partitionReader =
new HiveInputFormatPartitionReader(
- new Configuration(),
+ HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.defaultValue(),
new JobConf(hiveCatalog.getHiveConf()),
hiveCatalog.getHiveVersion(),
tablePath,