[hotfix][connector-hive] Avoid serializing TableConfig

Use and pass around only `threadNum` which is the only option read,
instead of the whole `TableConfig`, to prevent the relevant classes
that are serialized from trying to serialize also the `TableConfig`.
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
index 983faac..a061b27 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
@@ -23,7 +23,6 @@
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
 import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
@@ -78,7 +77,7 @@
             Collection<List<String>> seenPartitionsSinceOffset,
             FileSplitAssigner splitAssigner,
             long discoveryInterval,
-            ReadableConfig flinkConf,
+            int threadNum,
             JobConf jobConf,
             ObjectPath tablePath,
             ContinuousPartitionFetcher<Partition, T> fetcher,
@@ -95,7 +94,7 @@
                         currentReadOffset,
                         seenPartitionsSinceOffset,
                         tablePath,
-                        flinkConf,
+                        threadNum,
                         jobConf,
                         fetcher,
                         fetcherContext);
@@ -188,7 +187,7 @@
         private final Set<List<String>> seenPartitionsSinceOffset;
 
         private final ObjectPath tablePath;
-        private final ReadableConfig flinkConf;
+        private final int threadNum;
         private final JobConf jobConf;
         private final ContinuousPartitionFetcher<Partition, T> fetcher;
         private final HiveContinuousPartitionContext<Partition, T> fetcherContext;
@@ -197,14 +196,14 @@
                 T currentReadOffset,
                 Collection<List<String>> seenPartitionsSinceOffset,
                 ObjectPath tablePath,
-                ReadableConfig flinkConf,
+                int threadNum,
                 JobConf jobConf,
                 ContinuousPartitionFetcher<Partition, T> fetcher,
                 HiveContinuousPartitionContext<Partition, T> fetcherContext) {
             this.currentReadOffset = currentReadOffset;
             this.seenPartitionsSinceOffset = new HashSet<>(seenPartitionsSinceOffset);
             this.tablePath = tablePath;
-            this.flinkConf = flinkConf;
+            this.threadNum = threadNum;
             this.jobConf = jobConf;
             this.fetcher = fetcher;
             this.fetcherContext = fetcherContext;
@@ -244,7 +243,7 @@
                                     0,
                                     Collections.singletonList(
                                             fetcherContext.toHiveTablePartition(partition)),
-                                    flinkConf,
+                                    threadNum,
                                     jobConf));
                 }
             }
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
index 488ea1b..e68072a 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
@@ -242,7 +242,7 @@
 
         PartitionReader<HiveTablePartition, RowData> partitionReader =
                 new HiveInputFormatPartitionReader(
-                        flinkConf,
+                        flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM),
                         jobConf,
                         hiveVersion,
                         tablePath,
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
index 26c4ac5..d7f5209 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
@@ -22,7 +22,6 @@
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.AbstractFileSource;
 import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
 import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
@@ -60,7 +59,7 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final ReadableConfig flinkConf;
+    private final int threadNum;
     private final JobConfWrapper jobConfWrapper;
     private final List<String> partitionKeys;
     private final ContinuousPartitionFetcher<Partition, ?> fetcher;
@@ -73,7 +72,7 @@
             FileSplitAssigner.Provider splitAssigner,
             BulkFormat<T, HiveSourceSplit> readerFormat,
             @Nullable ContinuousEnumerationSettings continuousEnumerationSettings,
-            ReadableConfig flinkConf,
+            int threadNum,
             JobConf jobConf,
             ObjectPath tablePath,
             List<String> partitionKeys,
@@ -86,10 +85,10 @@
                 readerFormat,
                 continuousEnumerationSettings);
         Preconditions.checkArgument(
-                flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM) >= 1,
+                threadNum >= 1,
                 HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key()
                         + " cannot be less than 1");
-        this.flinkConf = flinkConf;
+        this.threadNum = threadNum;
         this.jobConfWrapper = new JobConfWrapper(jobConf);
         this.tablePath = tablePath;
         this.partitionKeys = partitionKeys;
@@ -164,7 +163,7 @@
                 seenPartitions,
                 getAssignerFactory().create(new ArrayList<>(splits)),
                 getContinuousEnumerationSettings().getDiscoveryInterval().toMillis(),
-                flinkConf,
+                threadNum,
                 jobConfWrapper.conf(),
                 tablePath,
                 fetcher,
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 25f7ed7..75b6e74 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -68,6 +68,7 @@
 import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
 import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
 import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
 import static org.apache.flink.table.catalog.hive.util.HiveTableUtil.checkAcidTable;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -78,7 +79,8 @@
     private static final Duration DEFAULT_SCAN_MONITOR_INTERVAL = Duration.ofMinutes(1L);
 
     private final JobConf jobConf;
-    private final ReadableConfig flinkConf;
+    private final int threadNum;
+    private final boolean fallbackMappedReader;
 
     private final ObjectPath tablePath;
     private final Map<String, String> tableOptions;
@@ -110,7 +112,9 @@
             @Nonnull String tableName,
             @Nonnull Map<String, String> tableOptions) {
         this.jobConf = jobConf;
-        this.flinkConf = flinkConf;
+        this.threadNum =
+                flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+        this.fallbackMappedReader = flinkConf.get(TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER);
         this.tablePath = new ObjectPath(dbName, tableName);
         this.hiveVersion = hiveVersion == null ? HiveShimLoader.getHiveVersion() : hiveVersion;
         HiveConf hiveConf = HiveConfUtils.create(jobConf);
@@ -147,7 +151,9 @@
             @Nullable String hiveVersion,
             @Nonnull CatalogTable catalogTable) {
         this.jobConf = jobConf;
-        this.flinkConf = flinkConf;
+        this.threadNum =
+                flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+        this.fallbackMappedReader = flinkConf.get(TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER);
         this.tablePath = tablePath;
         this.hiveVersion = hiveVersion == null ? HiveShimLoader.getHiveVersion() : hiveVersion;
         this.fullSchema = catalogTable.getSchema();
@@ -231,12 +237,12 @@
                 new Path[1],
                 new HiveSourceFileEnumerator.Provider(
                         partitions != null ? partitions : Collections.emptyList(),
-                        flinkConf,
+                        threadNum,
                         new JobConfWrapper(jobConf)),
                 splitAssigner,
                 bulkFormat,
                 continuousSourceSettings,
-                flinkConf,
+                threadNum,
                 jobConf,
                 tablePath,
                 partitionKeys,
@@ -318,7 +324,7 @@
                         fullSchema.getFieldDataTypes(),
                         hiveVersion,
                         getProducedRowType(),
-                        flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)),
+                        fallbackMappedReader),
                 limit);
     }
 }
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
index 4227c36..dd0fe98 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connectors.hive;
 
-import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
 import org.apache.flink.connectors.hive.read.HiveSourceSplit;
@@ -45,33 +44,27 @@
     // For non-partition hive table, partitions only contains one partition which partitionValues is
     // empty.
     private final List<HiveTablePartition> partitions;
-    private final ReadableConfig flinkConf;
+    private final int threadNum;
     private final JobConf jobConf;
 
     public HiveSourceFileEnumerator(
-            List<HiveTablePartition> partitions, ReadableConfig flinkConf, JobConf jobConf) {
+            List<HiveTablePartition> partitions, int threadNum, JobConf jobConf) {
         this.partitions = partitions;
-        this.flinkConf = flinkConf;
+        this.threadNum = threadNum;
         this.jobConf = jobConf;
     }
 
     @Override
     public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
             throws IOException {
-        return new ArrayList<>(createInputSplits(minDesiredSplits, partitions, flinkConf, jobConf));
+        return new ArrayList<>(createInputSplits(minDesiredSplits, partitions, threadNum, jobConf));
     }
 
     public static List<HiveSourceSplit> createInputSplits(
-            int minNumSplits,
-            List<HiveTablePartition> partitions,
-            ReadableConfig flinkConf,
-            JobConf jobConf)
+            int minNumSplits, List<HiveTablePartition> partitions, int threadNum, JobConf jobConf)
             throws IOException {
         List<HiveSourceSplit> hiveSplits = new ArrayList<>();
-        try (MRSplitsGetter splitsGetter =
-                new MRSplitsGetter(
-                        flinkConf.get(
-                                HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM))) {
+        try (MRSplitsGetter splitsGetter = new MRSplitsGetter(threadNum)) {
             for (HiveTablePartitionSplits partitionSplits :
                     splitsGetter.getHiveTablePartitionMRSplits(minNumSplits, partitions, jobConf)) {
                 HiveTablePartition partition = partitionSplits.getHiveTablePartition();
@@ -109,21 +102,19 @@
         private static final long serialVersionUID = 1L;
 
         private final List<HiveTablePartition> partitions;
-        private final ReadableConfig flinkConf;
+        private final int threadNum;
         private final JobConfWrapper jobConfWrapper;
 
         public Provider(
-                List<HiveTablePartition> partitions,
-                ReadableConfig flinkConf,
-                JobConfWrapper jobConfWrapper) {
+                List<HiveTablePartition> partitions, int threadNum, JobConfWrapper jobConfWrapper) {
             this.partitions = partitions;
-            this.flinkConf = flinkConf;
+            this.threadNum = threadNum;
             this.jobConfWrapper = jobConfWrapper;
         }
 
         @Override
         public FileEnumerator create() {
-            return new HiveSourceFileEnumerator(partitions, flinkConf, jobConfWrapper.conf());
+            return new HiveSourceFileEnumerator(partitions, threadNum, jobConfWrapper.conf());
         }
     }
 }
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 710c890..4316670 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -108,7 +108,8 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);
 
-    private final ReadableConfig flinkConf;
+    private final boolean fallbackMappedReader;
+    private final boolean fallbackMappedWriter;
     private final JobConf jobConf;
     private final CatalogTable catalogTable;
     private final ObjectIdentifier identifier;
@@ -128,7 +129,24 @@
             ObjectIdentifier identifier,
             CatalogTable table,
             @Nullable Integer configuredParallelism) {
-        this.flinkConf = flinkConf;
+        this(
+                flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
+                flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
+                jobConf,
+                identifier,
+                table,
+                configuredParallelism);
+    }
+
+    private HiveTableSink(
+            boolean fallbackMappedReader,
+            boolean fallbackMappedWriter,
+            JobConf jobConf,
+            ObjectIdentifier identifier,
+            CatalogTable table,
+            @Nullable Integer configuredParallelism) {
+        this.fallbackMappedReader = fallbackMappedReader;
+        this.fallbackMappedWriter = fallbackMappedWriter;
         this.jobConf = jobConf;
         this.identifier = identifier;
         this.catalogTable = table;
@@ -298,7 +316,7 @@
         org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
 
         BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
-        if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
+        if (fallbackMappedWriter) {
             builder =
                     bucketsBuilderForMRWriter(
                             recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
@@ -377,7 +395,7 @@
                 catalogTable,
                 hiveVersion,
                 (RowType) tableSchema.toRowDataType().getLogicalType(),
-                flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+                fallbackMappedReader);
     }
 
     private HiveTableMetaStoreFactory msFactory() {
@@ -487,7 +505,12 @@
     public DynamicTableSink copy() {
         HiveTableSink sink =
                 new HiveTableSink(
-                        flinkConf, jobConf, identifier, catalogTable, configuredParallelism);
+                        fallbackMappedReader,
+                        fallbackMappedWriter,
+                        jobConf,
+                        identifier,
+                        catalogTable,
+                        configuredParallelism);
         sink.staticPartitionSpec = staticPartitionSpec;
         sink.overwrite = overwrite;
         sink.dynamicGrouping = dynamicGrouping;
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index d7ee578..cc5d639 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -146,6 +146,8 @@
                             catalogTable.getPartitionKeys(),
                             remainingPartitions);
 
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
             int parallelism =
                     new HiveParallelismInference(tablePath, flinkConf)
                             .infer(
@@ -156,7 +158,7 @@
                                             HiveSourceFileEnumerator.createInputSplits(
                                                             0,
                                                             hivePartitionsToRead,
-                                                            flinkConf,
+                                                            threadNum,
                                                             jobConf)
                                                     .size())
                             .limit(limit);
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java
index 534c623..2493942 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connectors.hive.read;
 
-import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.table.PartitionReader;
 import org.apache.flink.connectors.hive.HiveTablePartition;
 import org.apache.flink.connectors.hive.JobConfWrapper;
@@ -36,7 +35,7 @@
         implements PartitionReader<HiveTablePartition, RowData> {
 
     private static final long serialVersionUID = 1L;
-    private final ReadableConfig flinkConf;
+    private final int threadNum;
     private final JobConfWrapper jobConfWrapper;
     private final String hiveVersion;
     protected final ObjectPath tablePath;
@@ -51,7 +50,7 @@
     private transient int readingSplitId;
 
     public HiveInputFormatPartitionReader(
-            ReadableConfig flinkConf,
+            int threadNum,
             JobConf jobConf,
             String hiveVersion,
             ObjectPath tablePath,
@@ -60,7 +59,7 @@
             List<String> partitionKeys,
             int[] selectedFields,
             boolean useMapRedReader) {
-        this.flinkConf = flinkConf;
+        this.threadNum = threadNum;
         this.jobConfWrapper = new JobConfWrapper(jobConf);
         this.hiveVersion = hiveVersion;
         this.tablePath = tablePath;
@@ -75,7 +74,7 @@
     public void open(List<HiveTablePartition> partitions) throws IOException {
         hiveTableInputFormat =
                 new HiveTableInputFormat(
-                        this.flinkConf,
+                        this.threadNum,
                         this.jobConfWrapper.conf(),
                         this.partitionKeys,
                         this.fieldTypes,
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
index 5297812..eadd114 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
@@ -23,8 +23,6 @@
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connectors.hive.HiveOptions;
 import org.apache.flink.connectors.hive.HiveTablePartition;
 import org.apache.flink.connectors.hive.HiveTablePartitionSplits;
 import org.apache.flink.connectors.hive.JobConfWrapper;
@@ -69,7 +67,7 @@
     private static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns";
     private static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types";
 
-    private final ReadableConfig flinkConf;
+    private final int threadNum;
 
     private final JobConfWrapper jobConf;
 
@@ -98,7 +96,7 @@
     @VisibleForTesting protected transient SplitReader reader;
 
     public HiveTableInputFormat(
-            ReadableConfig flinkConf,
+            int threadNum,
             JobConf jobConf,
             List<String> partitionKeys,
             DataType[] fieldTypes,
@@ -109,7 +107,7 @@
             boolean useMapRedReader,
             List<HiveTablePartition> partitions) {
         super(jobConf.getCredentials());
-        this.flinkConf = flinkConf;
+        this.threadNum = threadNum;
         this.jobConf = new JobConfWrapper(new JobConf(jobConf));
         this.partitionKeys = partitionKeys;
         this.fieldTypes = fieldTypes;
@@ -317,21 +315,15 @@
 
     @Override
     public HiveTableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
-        return createInputSplits(minNumSplits, partitions, flinkConf, jobConf.conf());
+        return createInputSplits(minNumSplits, partitions, threadNum, jobConf.conf());
     }
 
     public static HiveTableInputSplit[] createInputSplits(
-            int minNumSplits,
-            List<HiveTablePartition> partitions,
-            ReadableConfig flinkConf,
-            JobConf jobConf)
+            int minNumSplits, List<HiveTablePartition> partitions, int threadNum, JobConf jobConf)
             throws IOException {
         List<HiveTableInputSplit> hiveSplits = new ArrayList<>();
         int splitNum = 0;
-        try (MRSplitsGetter splitsGetter =
-                new MRSplitsGetter(
-                        flinkConf.get(
-                                HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM))) {
+        try (MRSplitsGetter splitsGetter = new MRSplitsGetter(threadNum)) {
             for (HiveTablePartitionSplits partitionSplits :
                     splitsGetter.getHiveTablePartitionMRSplits(minNumSplits, partitions, jobConf)) {
                 for (InputSplit inputSplit : partitionSplits.getInputSplits()) {
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
index f366f10..0816092 100644
--- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
+++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
@@ -182,7 +182,8 @@
                         0L,
                         seenPartitionsSinceOffset,
                         tablePath,
-                        configuration,
+                        configuration.get(
+                                HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM),
                         jobConf,
                         continuousPartitionFetcher,
                         fetcherContext);
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java
index bc8b971..aac9007 100644
--- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java
+++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.connectors.hive.read;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.hive.HiveOptions;
 import org.apache.flink.connectors.hive.HiveTablePartition;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
@@ -62,7 +62,7 @@
         // create partition reader
         HiveInputFormatPartitionReader partitionReader =
                 new HiveInputFormatPartitionReader(
-                        new Configuration(),
+                        HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.defaultValue(),
                         new JobConf(hiveCatalog.getHiveConf()),
                         hiveCatalog.getHiveVersion(),
                         tablePath,