[FLINK-29880][hive] Introduce auto compaction for Hive sink in batch mode (#21703)
diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index 033e8bb..c54efe4 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -528,6 +528,69 @@
 **注意:**
 - 只有批模式才支持自动收集统计信息,流模式目前还不支持自动收集统计信息。
 
+### 文件合并
+
+在使用 Flink 写 Hive 表的时候,Flink 也支持自动对小文件进行合并以减少小文件的数量。
+
+#### Stream Mode
+
+流模式下,合并小文件的行为与写 `文件系统` 一样,更多细节请参考 [文件合并]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction)。
+
+#### Batch Mode
+
+在批模式,并且自动合并小文件已经开启的情况下,在结束写 Hive 表后,Flink 会计算每个分区下文件的平均大小,如果文件的平均大小小于用户指定的一个阈值,Flink 则会将这些文件合并成指定大小的文件。下面是文件合并涉及到的参数:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 42%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>auto-compaction</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>是否开启自动合并,数据将会首先被写入临时文件,合并结束后,文件才可见。</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.small-files.avg-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">16MB</td>
+        <td>MemorySize</td>
+        <td>合并文件的阈值,当文件的平均大小小于该阈值,Flink 将对这些文件进行合并。默认值是 16MB。</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>MemorySize</td>
+        <td>合并文件的目标大小,即期望将文件合并成多大的文件,默认值是 <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink-rolling-policy-file-size">rolling file</a>的大小。</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.parallelism</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>Integer</td>
+        <td>
+        合并文件的并行度。 如果没有设置,它将使用 <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink-parallelism">sink parallelism</a> 作为并行度。
+        当使用了 <a href="{{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, 该并行度可能会很小,导致花费很多时间进行文件合并。
+        在这种情况下, 你可以手动设置该值为一个更大的值。
+        </td>
+    </tr>
+  </tbody>
+</table>
+
 ## 格式
 
 Flink 对 Hive 的集成已经在如下的文件格式进行了测试:
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md
index 3636eaf..f02270e 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -558,6 +558,70 @@
 **NOTE:**
 - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode doesn't support it yet.
 
+### File Compaction
+
+The Hive sink also supports file compactions, which allows applications to reduce the number of files generated while writing into Hive.
+
+#### Stream Mode
+
+In stream mode, the behavior is the same as `FileSystem` sink. Please refer to [File Compaction]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction) for more details.
+
+#### Batch Mode
+
+When it's in batch mode and auto compaction is enabled, after finishing writing files, Flink will calculate the average size of written files for each partition. And if the average size is less than the
+configured threshold, then Flink will try to compact these files to files with a target size. The following are the table's options for file compaction.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 42%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>auto-compaction</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>Whether to enable automatic compaction in Hive sink or not. The data will be written to temporary files first. The temporary files are invisible before compaction.</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.small-files.avg-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">16MB</td>
+        <td>MemorySize</td>
+        <td>The threshold for file compaction. If the average size of the files is less than this value, FLink will then compact these files. the default value is 16MB.</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>MemorySize</td>
+        <td>The compaction target file size, the default value is the <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink.rolling-policy.file-size">rolling file size</a>.</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.parallelism</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>Integer</td>
+        <td>
+        The parallelism to compact files. If not set, it will use the <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink-parallelism">sink parallelism</a>.
+        When using <a href="{{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism of the compact operator deduced by the scheduler may be small, which will cause taking much time to finish compaction.
+        In such a case, please remember to set this option to a bigger value manually.
+        </td>
+    </tr>
+  </tbody>
+</table>
+
 ## Formats
 
 Flink's Hive integration has been tested against the following file formats:
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
index 0647f07..90bb016 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
@@ -88,7 +88,7 @@
             return sink;
         }
 
-        final Integer configuredParallelism =
+        final Integer configuredSinkParallelism =
                 Configuration.fromMap(context.getCatalogTable().getOptions())
                         .get(FileSystemConnectorOptions.SINK_PARALLELISM);
         final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);
@@ -97,7 +97,7 @@
                 jobConf,
                 context.getObjectIdentifier(),
                 context.getCatalogTable(),
-                configuredParallelism);
+                configuredSinkParallelism);
     }
 
     @Override
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
index fc74424..636c8bd 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@@ -140,6 +140,15 @@
     public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME =
             FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
 
+    public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE =
+            key("compaction.small-files.avg-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(16))
+                    .withDescription(
+                            "When it's for writing Hive in batch mode and `auto-compaction` is configured to be true, if the average written file size is less this number,"
+                                    + " Flink will start to compact theses files to bigger files with target size which is configured by `compaction.file-size`."
+                                    + " If the `compaction.file-size` is not configured, it will use `sink.rolling-policy.file-size` as the target size.");
+
     public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE =
             key("table.exec.hive.sink.statistic-auto-gather.enable")
                     .booleanType()
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 32825ca..3c03a91 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -19,8 +19,8 @@
 package org.apache.flink.connectors.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.table.EmptyMetaStoreFactory;
 import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
@@ -30,8 +30,12 @@
 import org.apache.flink.connector.file.table.PartitionCommitPolicy;
 import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
 import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import org.apache.flink.connector.file.table.batch.BatchSink;
+import org.apache.flink.connector.file.table.batch.compact.BatchCompactOperator;
+import org.apache.flink.connector.file.table.batch.compact.BatchFileWriter;
 import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
 import org.apache.flink.connector.file.table.stream.StreamingSink;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
 import org.apache.flink.connector.file.table.stream.compact.CompactReader;
 import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
 import org.apache.flink.connectors.hive.util.HiveConfUtils;
@@ -134,14 +138,14 @@
     private final boolean autoGatherStatistic;
     private final int gatherStatsThreadNum;
 
-    @Nullable private final Integer configuredParallelism;
+    @Nullable private final Integer configuredSinkParallelism;
 
     public HiveTableSink(
             ReadableConfig flinkConf,
             JobConf jobConf,
             ObjectIdentifier identifier,
             CatalogTable table,
-            @Nullable Integer configuredParallelism) {
+            @Nullable Integer configuredSinkParallelism) {
         this(
                 flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
                 flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
@@ -151,7 +155,7 @@
                 jobConf,
                 identifier,
                 table,
-                configuredParallelism);
+                configuredSinkParallelism);
     }
 
     private HiveTableSink(
@@ -163,7 +167,7 @@
             JobConf jobConf,
             ObjectIdentifier identifier,
             CatalogTable table,
-            @Nullable Integer configuredParallelism) {
+            @Nullable Integer configuredSinkParallelism) {
         this.fallbackMappedReader = fallbackMappedReader;
         this.fallbackMappedWriter = fallbackMappedWriter;
         this.dynamicGroupingEnabled = dynamicGroupingEnabled;
@@ -178,7 +182,7 @@
                         "Hive version is not defined");
         hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
         tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
-        this.configuredParallelism = configuredParallelism;
+        this.configuredSinkParallelism = configuredSinkParallelism;
         validateAutoGatherStatistic(autoGatherStatistic, catalogTable);
     }
 
@@ -293,10 +297,10 @@
                             .withPartPrefix("part-" + UUID.randomUUID())
                             .withPartSuffix(extension == null ? "" : extension);
 
-            final int parallelism =
-                    Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
+            final int sinkParallelism =
+                    Optional.ofNullable(configuredSinkParallelism)
+                            .orElse(dataStream.getParallelism());
             if (isBounded) {
-                OutputFileConfig fileNaming = fileNamingBuilder.build();
                 TableMetaStoreFactory msFactory =
                         isInsertDirectory
                                 ? new EmptyMetaStoreFactory(
@@ -325,10 +329,13 @@
                         converter,
                         writerFactory,
                         msFactory,
-                        fileNaming,
+                        fileNamingBuilder,
                         stagingParentDir,
+                        sd,
+                        tableProps,
                         isToLocal,
-                        parallelism);
+                        overwrite,
+                        sinkParallelism);
             } else {
                 if (overwrite) {
                     throw new IllegalStateException("Streaming mode not support overwrite.");
@@ -340,7 +347,7 @@
                         tableProps,
                         writerFactory,
                         fileNamingBuilder,
-                        parallelism);
+                        sinkParallelism);
             }
         } catch (IOException e) {
             throw new FlinkRuntimeException("Failed to create staging dir", e);
@@ -351,7 +358,196 @@
         }
     }
 
-    private DataStreamSink<Row> createBatchSink(
+    private DataStreamSink<?> createBatchSink(
+            DataStream<RowData> dataStream,
+            DataStructureConverter converter,
+            HiveWriterFactory recordWriterFactory,
+            TableMetaStoreFactory metaStoreFactory,
+            OutputFileConfig.OutputFileConfigBuilder fileConfigBuilder,
+            String stagingParentDir,
+            StorageDescriptor sd,
+            Properties tableProps,
+            boolean isToLocal,
+            boolean overwrite,
+            int sinkParallelism)
+            throws IOException {
+        org.apache.flink.configuration.Configuration conf =
+                new org.apache.flink.configuration.Configuration();
+        catalogTable.getOptions().forEach(conf::setString);
+        boolean autoCompaction = conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION);
+        if (autoCompaction) {
+            Optional<Integer> compactParallelismOptional =
+                    conf.getOptional(FileSystemConnectorOptions.COMPACTION_PARALLELISM);
+            int compactParallelism = compactParallelismOptional.orElse(sinkParallelism);
+            return createBatchCompactSink(
+                    dataStream,
+                    converter,
+                    recordWriterFactory,
+                    metaStoreFactory,
+                    fileConfigBuilder
+                            .withPartPrefix(
+                                    BatchCompactOperator.UNCOMPACTED_PREFIX
+                                            + "part-"
+                                            + UUID.randomUUID())
+                            .build(),
+                    stagingParentDir,
+                    sd,
+                    tableProps,
+                    isToLocal,
+                    overwrite,
+                    sinkParallelism,
+                    compactParallelism);
+        } else {
+            return createBatchNoCompactSink(
+                    dataStream,
+                    converter,
+                    recordWriterFactory,
+                    metaStoreFactory,
+                    fileConfigBuilder.build(),
+                    stagingParentDir,
+                    isToLocal,
+                    sinkParallelism);
+        }
+    }
+
+    private DataStreamSink<?> createBatchCompactSink(
+            DataStream<RowData> dataStream,
+            DataStructureConverter converter,
+            HiveWriterFactory recordWriterFactory,
+            TableMetaStoreFactory metaStoreFactory,
+            OutputFileConfig fileNaming,
+            String stagingParentDir,
+            StorageDescriptor sd,
+            Properties tableProps,
+            boolean isToLocal,
+            boolean overwrite,
+            final int sinkParallelism,
+            final int compactParallelism)
+            throws IOException {
+        String[] partitionColumns = getPartitionKeyArray();
+        org.apache.flink.configuration.Configuration conf =
+                new org.apache.flink.configuration.Configuration();
+        catalogTable.getOptions().forEach(conf::setString);
+        HadoopFileSystemFactory fsFactory = fsFactory();
+        org.apache.flink.core.fs.Path tmpPath =
+                new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf));
+
+        PartitionCommitPolicyFactory partitionCommitPolicyFactory =
+                new PartitionCommitPolicyFactory(
+                        conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
+                        conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
+                        conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME));
+
+        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
+        BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder =
+                getBucketsBuilder(path, recordWriterFactory, sd, fileNaming, conf);
+
+        CompactReader.Factory<RowData> readerFactory = createCompactReaderFactory(sd, tableProps);
+
+        HiveOutputFormatFactory outputFormatFactory =
+                new HiveOutputFormatFactory(recordWriterFactory);
+        HiveRowPartitionComputer partitionComputer =
+                new HiveRowPartitionComputer(
+                        hiveShim,
+                        JobConfUtils.getDefaultPartitionName(jobConf),
+                        tableSchema.getFieldNames(),
+                        tableSchema.getFieldDataTypes(),
+                        partitionColumns);
+
+        DataStream<CoordinatorInput> writerDataStream =
+                dataStream
+                        .map(value -> (Row) converter.toExternal(value))
+                        .setParallelism(sinkParallelism)
+                        .transform(
+                                "batch_compact_writer",
+                                TypeInformation.of(CoordinatorInput.class),
+                                new BatchFileWriter<>(
+                                        fsFactory,
+                                        tmpPath,
+                                        partitionColumns,
+                                        dynamicGrouping,
+                                        staticPartitionSpec,
+                                        outputFormatFactory,
+                                        partitionComputer,
+                                        fileNaming))
+                        .setParallelism(sinkParallelism);
+        long compactAverageSize = conf.get(HiveOptions.COMPACT_SMALL_FILES_AVG_SIZE).getBytes();
+
+        long compactTargetSize =
+                conf.getOptional(FileSystemConnectorOptions.COMPACTION_FILE_SIZE)
+                        .orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
+                        .getBytes();
+
+        return BatchSink.createBatchCompactSink(
+                writerDataStream,
+                builder,
+                readerFactory,
+                fsFactory,
+                metaStoreFactory,
+                partitionCommitPolicyFactory,
+                partitionColumns,
+                staticPartitionSpec,
+                tmpPath,
+                identifier,
+                compactAverageSize,
+                compactTargetSize,
+                isToLocal,
+                overwrite,
+                compactParallelism);
+    }
+
+    private BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>>
+            getBucketsBuilder(
+                    org.apache.flink.core.fs.Path path,
+                    HiveWriterFactory recordWriterFactory,
+                    StorageDescriptor sd,
+                    OutputFileConfig fileNaming,
+                    org.apache.flink.configuration.Configuration conf) {
+        HiveRowDataPartitionComputer partComputer =
+                new HiveRowDataPartitionComputer(
+                        hiveShim,
+                        JobConfUtils.getDefaultPartitionName(jobConf),
+                        tableSchema.getFieldNames(),
+                        tableSchema.getFieldDataTypes(),
+                        getPartitionKeyArray());
+        TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
+        HiveRollingPolicy rollingPolicy =
+                new HiveRollingPolicy(
+                        conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
+                        conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis(),
+                        conf.get(SINK_ROLLING_POLICY_INACTIVITY_INTERVAL).toMillis());
+
+        BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
+        if (fallbackMappedWriter) {
+            builder =
+                    bucketsBuilderForMRWriter(
+                            recordWriterFactory, sd, assigner, rollingPolicy, fileNaming);
+            LOG.info("Hive sink: Use MapReduce RecordWriter writer.");
+        } else {
+            Optional<BulkWriter.Factory<RowData>> bulkFactory =
+                    createBulkWriterFactory(getPartitionKeyArray(), sd);
+            if (bulkFactory.isPresent()) {
+                builder =
+                        StreamingFileSink.forBulkFormat(
+                                        path,
+                                        new FileSystemTableSink.ProjectionBulkFactory(
+                                                bulkFactory.get(), partComputer))
+                                .withBucketAssigner(assigner)
+                                .withRollingPolicy(rollingPolicy)
+                                .withOutputFileConfig(fileNaming);
+                LOG.info("Hive sink: Use native parquet&orc writer.");
+            } else {
+                builder =
+                        bucketsBuilderForMRWriter(
+                                recordWriterFactory, sd, assigner, rollingPolicy, fileNaming);
+                LOG.info(
+                        "Hive sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
+            }
+        }
+        return builder;
+    }
+
+    private DataStreamSink<Row> createBatchNoCompactSink(
             DataStream<RowData> dataStream,
             DataStructureConverter converter,
             HiveWriterFactory recordWriterFactory,
@@ -359,7 +555,7 @@
             OutputFileConfig fileNaming,
             String stagingParentDir,
             boolean isToLocal,
-            final int parallelism)
+            final int sinkParallelism)
             throws IOException {
         org.apache.flink.configuration.Configuration conf =
                 new org.apache.flink.configuration.Configuration();
@@ -389,11 +585,8 @@
                         conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
                         conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
                         conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME)));
-        return dataStream
-                .map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value))
-                .setParallelism(parallelism)
-                .writeUsingOutputFormat(builder.build())
-                .setParallelism(parallelism);
+        return BatchSink.createBatchNoCompactSink(
+                dataStream, converter, builder.build(), sinkParallelism);
     }
 
     private DataStreamSink<?> createStreamSink(
@@ -419,20 +612,6 @@
                             FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
         }
 
-        HiveRowDataPartitionComputer partComputer =
-                new HiveRowDataPartitionComputer(
-                        hiveShim,
-                        JobConfUtils.getDefaultPartitionName(jobConf),
-                        tableSchema.getFieldNames(),
-                        tableSchema.getFieldDataTypes(),
-                        getPartitionKeyArray());
-        TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
-        HiveRollingPolicy rollingPolicy =
-                new HiveRollingPolicy(
-                        conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
-                        conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis(),
-                        conf.get(SINK_ROLLING_POLICY_INACTIVITY_INTERVAL).toMillis());
-
         boolean autoCompaction = conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION);
         if (autoCompaction) {
             fileNamingBuilder.withPartPrefix(
@@ -441,34 +620,8 @@
         OutputFileConfig outputFileConfig = fileNamingBuilder.build();
 
         org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
-
-        BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
-        if (fallbackMappedWriter) {
-            builder =
-                    bucketsBuilderForMRWriter(
-                            recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
-            LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
-        } else {
-            Optional<BulkWriter.Factory<RowData>> bulkFactory =
-                    createBulkWriterFactory(getPartitionKeyArray(), sd);
-            if (bulkFactory.isPresent()) {
-                builder =
-                        StreamingFileSink.forBulkFormat(
-                                        path,
-                                        new FileSystemTableSink.ProjectionBulkFactory(
-                                                bulkFactory.get(), partComputer))
-                                .withBucketAssigner(assigner)
-                                .withRollingPolicy(rollingPolicy)
-                                .withOutputFileConfig(outputFileConfig);
-                LOG.info("Hive streaming sink: Use native parquet&orc writer.");
-            } else {
-                builder =
-                        bucketsBuilderForMRWriter(
-                                recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
-                LOG.info(
-                        "Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
-            }
-        }
+        BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder =
+                getBucketsBuilder(path, recordWriterFactory, sd, outputFileConfig, conf);
 
         long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
 
@@ -652,7 +805,7 @@
                         jobConf,
                         identifier,
                         catalogTable,
-                        configuredParallelism);
+                        configuredSinkParallelism);
         sink.staticPartitionSpec = staticPartitionSpec;
         sink.overwrite = overwrite;
         sink.dynamicGrouping = dynamicGrouping;
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.java
new file mode 100644
index 0000000..b8ad4eb
--- /dev/null
+++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for Hive table compaction in batch mode. */
+public class HiveTableCompactSinkITCase {
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
+
+    private TableEnvironment tableEnv;
+    private HiveCatalog hiveCatalog;
+    private String warehouse;
+
+    @BeforeEach
+    public void setUp() {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog.open();
+        warehouse = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+        tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (hiveCatalog != null) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Test
+    public void testNoCompaction() throws Exception {
+        tableEnv.executeSql(
+                "CREATE TABLE src ("
+                        + " key string,"
+                        + " value string"
+                        + ") TBLPROPERTIES ("
+                        + " 'auto-compaction' = 'true', "
+                        + " 'compaction.small-files.avg-size' = '1b', "
+                        + " 'sink.parallelism' = '4'" // set sink parallelism = 8
+                        + ")");
+        tableEnv.executeSql(
+                        "insert into src values ('k1', 'v1'), ('k2', 'v2'),"
+                                + "('k3', 'v3'), ('k4', 'v4')")
+                .await();
+
+        List<Path> files = listDataFiles(Paths.get(warehouse, "src"));
+        // auto compaction enabled, but the files' average size isn't less than 1b,  so the files
+        // num should be 4
+        assertThat(files).hasSize(4);
+        List<String> result = toSortedResult(tableEnv.executeSql("select * from src"));
+        assertThat(result.toString()).isEqualTo("[+I[k1, v1], +I[k2, v2], +I[k3, v3], +I[k4, v4]]");
+    }
+
+    @Test
+    public void testCompactNonPartitionedTable() throws Exception {
+        tableEnv.executeSql(
+                "CREATE TABLE src ("
+                        + " key string,"
+                        + " value string"
+                        + ") TBLPROPERTIES ("
+                        + " 'auto-compaction' = 'true', "
+                        + " 'sink.parallelism' = '4'" // set sink parallelism = 8
+                        + ")");
+        tableEnv.executeSql(
+                        "insert into src values ('k1', 'v1'), ('k2', 'v2'),"
+                                + "('k3', 'v3'), ('k4', 'v4')")
+                .await();
+
+        // auto compaction is enabled, so all the files should be merged be one file
+        List<Path> files = listDataFiles(Paths.get(warehouse, "src"));
+        assertThat(files).hasSize(1);
+        List<String> result = toSortedResult(tableEnv.executeSql("select * from src"));
+        assertThat(result.toString()).isEqualTo("[+I[k1, v1], +I[k2, v2], +I[k3, v3], +I[k4, v4]]");
+    }
+
+    @Test
+    public void testCompactPartitionedTable() throws Exception {
+        tableEnv.executeSql(
+                "CREATE TABLE src ("
+                        + " key string,"
+                        + " value string"
+                        + ") partitioned by (p1 int,p2 string) TBLPROPERTIES ("
+                        + " 'auto-compaction' = 'true', "
+                        + " 'sink.parallelism' = '8'" // set sink parallelism = 8
+                        + ")");
+
+        // test compaction for static partition
+        tableEnv.executeSql(
+                        "insert into src partition (p1=0,p2='static') values (1,'a'),(2,'b'),(3,'c')")
+                .await();
+        // auto compaction is enabled, so all the files in same partition should be merged be one
+        // file
+        List<Path> files = listDataFiles(Paths.get(warehouse, "src/p1=0/p2=static"));
+        assertThat(files).hasSize(1);
+        // verify the result
+        List<String> result = toSortedResult(tableEnv.executeSql("select * from src"));
+        assertThat(result.toString())
+                .isEqualTo("[+I[1, a, 0, static], +I[2, b, 0, static], +I[3, c, 0, static]]");
+
+        // test compaction for dynamic partition
+        tableEnv.executeSql(
+                        "insert into src partition (p1=0,p2) values (1,'a','d1'),"
+                                + " (2,'b','d2'), (3,'c','d1'), (4,'d','d2')")
+                .await();
+        // auto compaction is enabled, so all the files in same partition should be merged be one
+        // file
+        files = listDataFiles(Paths.get(warehouse, "src/p1=0/p2=d1"));
+        assertThat(files).hasSize(1);
+        files = listDataFiles(Paths.get(warehouse, "src/p1=0/p2=d2"));
+        assertThat(files).hasSize(1);
+        // verify the result
+        result = toSortedResult(tableEnv.executeSql("select * from src"));
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[1, a, 0, d1], +I[1, a, 0, static], +I[2, b, 0, d2],"
+                                + " +I[2, b, 0, static], +I[3, c, 0, d1], +I[3, c, 0, static], +I[4, d, 0, d2]]");
+    }
+
+    @Test
+    public void testConditionalCompact() throws Exception {
+        tableEnv.executeSql(
+                "CREATE TABLE src ("
+                        + " key string,"
+                        + " value string"
+                        + ") partitioned by (p int) TBLPROPERTIES ("
+                        + " 'auto-compaction' = 'true', "
+                        + " 'compaction.small-files.avg-size' = '9b', "
+                        + " 'sink.parallelism' = '4'" // set sink parallelism = 8
+                        + ")");
+
+        tableEnv.executeSql(
+                        "insert into src values ('k1', 'v1', 1), ('k2', 'v2', 1),"
+                                + "('k3', 'v3', 2), ('k4', 'v4', 2), ('k5', 'v5', 1)")
+                .await();
+
+        // one row is 6 bytes, so the partition "p=2" will contain two files, one of which only
+        // contain one row, the average size is 6 bytes. so the files should be merged to one single
+        // file.
+        List<Path> files = listDataFiles(Paths.get(warehouse, "src/p=2"));
+        assertThat(files).hasSize(1);
+
+        // the partition "p=1" will contain two files, one contain two rows, and the other contain
+        // one row. the average size is 9 bytes, so the files shouldn't be merged
+        files = listDataFiles(Paths.get(warehouse, "src/p=1"));
+        assertThat(files).hasSize(2);
+
+        List<String> result = toSortedResult(tableEnv.executeSql("select * from src"));
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[k1, v1, 1], +I[k2, v2, 1], +I[k3, v3, 2], +I[k4, v4, 2], +I[k5, v5, 1]]");
+    }
+
+    private List<Path> listDataFiles(Path path) throws Exception {
+        String defaultSuccessFileName =
+                HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.defaultValue();
+        return Files.list(path)
+                .filter(
+                        p ->
+                                !p.toFile().isHidden()
+                                        && !p.toFile().getName().equals(defaultSuccessFileName))
+                .collect(Collectors.toList());
+    }
+
+    private List<String> toSortedResult(TableResult tableResult) {
+        List<Row> rows = CollectionUtil.iteratorToList(tableResult.collect());
+        return rows.stream().map(Row::toString).sorted().collect(Collectors.toList());
+    }
+}