[INLONG-8092][Sort] Support all database and multiple tables transmission for Hive (#8096)

diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index e04c8b1..84cf2c5 100644
--- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.base;
 
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 
 import org.apache.flink.configuration.ConfigOption;
@@ -261,6 +262,20 @@
                     .withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and"
                             + "`time`, so type conversions must be mapped to types supported by spark.");
 
+    public static final ConfigOption<PartitionPolicy> SINK_PARTITION_POLICY =
+            ConfigOptions.key("sink.partition.policy")
+                    .enumType(PartitionPolicy.class)
+                    .defaultValue(PartitionPolicy.PROC_TIME)
+                    .withDescription("The policy of partitioning table.");
+
+    public static final ConfigOption<String> SOURCE_PARTITION_FIELD_NAME =
+            ConfigOptions.key("source.partition.field.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The field name in source generic raw data, partition table by the field value dynamically."
+                                    + "Support regex expressions to match many fields in source generic raw data.");
+
     // ========================================= dirty configuration =========================================
     public static final String DIRTY_PREFIX = "dirty.";
 
diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/PartitionPolicy.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/PartitionPolicy.java
new file mode 100644
index 0000000..eb9eaf4
--- /dev/null
+++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/PartitionPolicy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.sink;
+
+public enum PartitionPolicy {
+
+    PROC_TIME("partition table by flink processing time"),
+    ASSIGN_FIELD("partition table by assigning fields"),
+    SOURCE_DATE_FIELD("partition table by date or timestamp field from source generic data"),
+    NONE("do not partition table");
+
+    PartitionPolicy(String description) {
+        this.description = description;
+    }
+
+    private String description;
+
+    public String getDescription() {
+        return description;
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/pom.xml
index aff44ad..ff588fb 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/pom.xml
@@ -31,8 +31,135 @@
 
     <properties>
         <inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+        <hive3x.version>3.1.3</hive3x.version>
+        <hive2x.version>2.2.0</hive2x.version>
+        <orc.core.version>1.4.3</orc.core.version>
+        <aircompressor.version>0.8</aircompressor.version>
     </properties>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.hive</groupId>
+                <artifactId>hive-exec</artifactId>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.hive</groupId>
+                        <artifactId>hive-vector-code-gen</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.hive</groupId>
+                        <artifactId>hive-shims</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>commons-codec</groupId>
+                        <artifactId>commons-codec</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>commons-httpclient</groupId>
+                        <artifactId>commons-httpclient</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-slf4j-impl</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-1.2-api</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.antlr</groupId>
+                        <artifactId>antlr-runtime</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.antlr</groupId>
+                        <artifactId>ST4</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.ant</groupId>
+                        <artifactId>ant</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.commons</groupId>
+                        <artifactId>commons-compress</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.ivy</groupId>
+                        <artifactId>ivy</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.zookeeper</groupId>
+                        <artifactId>zookeeper</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.curator</groupId>
+                        <artifactId>apache-curator</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.curator</groupId>
+                        <artifactId>curator-framework</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.codehaus.groovy</groupId>
+                        <artifactId>groovy-all</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.calcite</groupId>
+                        <artifactId>calcite-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.calcite</groupId>
+                        <artifactId>calcite-druid</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.calcite.avatica</groupId>
+                        <artifactId>avatica</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.calcite</groupId>
+                        <artifactId>calcite-avatica</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.google.code.gson</groupId>
+                        <artifactId>gson</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>stax</groupId>
+                        <artifactId>stax-api</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.google.guava</groupId>
+                        <artifactId>guava</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>apache-log4j-extras</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>commons-io</groupId>
+                        <artifactId>commons-io</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>commons-lang</groupId>
+                        <artifactId>commons-lang</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.inlong</groupId>
@@ -50,6 +177,10 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-api</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>commons-lang</groupId>
+                    <artifactId>commons-lang</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -59,121 +190,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-vector-code-gen</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-shims</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-codec</groupId>
-                    <artifactId>commons-codec</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-httpclient</groupId>
-                    <artifactId>commons-httpclient</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-slf4j-impl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-1.2-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.antlr</groupId>
-                    <artifactId>antlr-runtime</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.antlr</groupId>
-                    <artifactId>ST4</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.ant</groupId>
-                    <artifactId>ant</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-compress</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.ivy</groupId>
-                    <artifactId>ivy</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.curator</groupId>
-                    <artifactId>apache-curator</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.curator</groupId>
-                    <artifactId>curator-framework</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.groovy</groupId>
-                    <artifactId>groovy-all</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.calcite</groupId>
-                    <artifactId>calcite-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.calcite</groupId>
-                    <artifactId>calcite-druid</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.calcite.avatica</groupId>
-                    <artifactId>avatica</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.calcite</groupId>
-                    <artifactId>calcite-avatica</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.code.gson</groupId>
-                    <artifactId>gson</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>stax</groupId>
-                    <artifactId>stax-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>apache-log4j-extras</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-io</groupId>
-                    <artifactId>commons-io</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
             <exclusions>
@@ -209,9 +225,12 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-api</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>commons-lang</groupId>
+                    <artifactId>commons-lang</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
-
     </dependencies>
 
     <build>
@@ -235,6 +254,14 @@
                                         <include>META-INF/services/org.apache.flink.table.factories.Factory</include>
                                     </includes>
                                 </filter>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
                             </filters>
                         </configuration>
                     </execution>
@@ -242,4 +269,48 @@
             </plugin>
         </plugins>
     </build>
+
+    <profiles>
+        <profile>
+            <id>hive3</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-exec</artifactId>
+                    <version>${hive3x.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>hive2</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-exec</artifactId>
+                    <version>${hive2x.version}</version>
+                </dependency>
+                <!-- https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/hive/ -->
+                <!-- Orc dependencies ,required by the ORC vectorized optimizations -->
+                <dependency>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                    <version>${orc.core.version}</version>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>org.slf4j</groupId>
+                            <artifactId>slf4j-api</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+                <dependency>
+                    <groupId>io.airlift</groupId>
+                    <artifactId>aircompressor</artifactId>
+                    <version>${aircompressor.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
 </project>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveBulkWriterFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveBulkWriterFactory.java
new file mode 100644
index 0000000..89aa2d6
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveBulkWriterFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive;
+
+import org.apache.inlong.sort.hive.filesystem.InLongHadoopPathBasedBulkWriter;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Hive bulk writer factory for path-based bulk file writer that writes to the specific hadoop path.
+ */
+public class HiveBulkWriterFactory implements HadoopPathBasedBulkWriter.Factory<RowData> {
+
+    private static final long serialVersionUID = 1L;
+    private final HiveWriterFactory factory;
+
+    public HiveBulkWriterFactory(HiveWriterFactory factory) {
+        this.factory = factory;
+    }
+
+    public HiveWriterFactory getFactory() {
+        return factory;
+    }
+
+    @Override
+    public HadoopPathBasedBulkWriter<RowData> create(Path targetPath, Path inProgressPath) throws IOException {
+        FileSinkOperator.RecordWriter recordWriter = factory.createRecordWriter(inProgressPath);
+        Function<RowData, Writable> rowConverter = factory.createRowDataConverter();
+        FileSystem fs = FileSystem.get(inProgressPath.toUri(), factory.getJobConf());
+        return new InLongHadoopPathBasedBulkWriter(recordWriter, rowConverter, fs, inProgressPath);
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java
index 15d80d9..492c453 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java
@@ -66,4 +66,35 @@
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Regard upsert delete as insert kind.");
+
+    public static final ConfigOption<String> SINK_PARTITION_NAME =
+            ConfigOptions.key("sink.partition.name")
+                    .stringType()
+                    .defaultValue("pt")
+                    .withDescription("The default partition name for creating new hive table.");
+
+    public static final ConfigOption<Integer> HIVE_SCHEMA_SCAN_INTERVAL =
+            ConfigOptions.key("sink.schema.scan.interval")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription("The interval milliseconds to scan if source table schema changed.");
+
+    public static final ConfigOption<String> HIVE_STORAGE_INPUT_FORMAT =
+            ConfigOptions.key("hive.storage.input.format")
+                    .stringType()
+                    .defaultValue("org.apache.hadoop.mapred.TextInputFormat")
+                    .withDescription("The input format of storage descriptor");
+
+    public static final ConfigOption<String> HIVE_STORAGE_OUTPUT_FORMAT =
+            ConfigOptions.key("hive.storage.output.format")
+                    .stringType()
+                    .defaultValue("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
+                    .withDescription("The output format of storage descriptor");
+
+    public static final ConfigOption<String> HIVE_STORAGE_SERIALIZATION_LIB =
+            ConfigOptions.key("hive.storage.serialization.lib")
+                    .stringType()
+                    .defaultValue("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
+                    .withDescription("The serialization library of storage descriptor");
+
 }
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java
new file mode 100644
index 0000000..f37740f
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.table.filesystem.OutputFormatFactory;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/** Hive {@link OutputFormatFactory}, use {@link RecordWriter} to write record. */
+public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
+
+    private static final long serialVersionUID = 2L;
+
+    private final HiveWriterFactory factory;
+
+    public HiveOutputFormatFactory(HiveWriterFactory factory) {
+        this.factory = factory;
+    }
+
+    @Override
+    public HiveOutputFormat createOutputFormat(Path path) {
+        return new HiveOutputFormat(
+                factory.createRecordWriter(HadoopFileSystem.toHadoopPath(path)),
+                factory.createRowConverter());
+    }
+
+    private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> {
+
+        private final RecordWriter recordWriter;
+        private final Function<Row, Writable> rowConverter;
+
+        private HiveOutputFormat(RecordWriter recordWriter, Function<Row, Writable> rowConverter) {
+            this.recordWriter = recordWriter;
+            this.rowConverter = rowConverter;
+        }
+
+        @Override
+        public void configure(Configuration parameters) {
+        }
+
+        @Override
+        public void open(int taskNumber, int numTasks) {
+        }
+
+        @Override
+        public void writeRecord(Row record) throws IOException {
+            recordWriter.write(rowConverter.apply(record));
+        }
+
+        @Override
+        public void close() throws IOException {
+            recordWriter.close(false);
+        }
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveRowDataPartitionComputer.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveRowDataPartitionComputer.java
index f38a415..15de4c6 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveRowDataPartitionComputer.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveRowDataPartitionComputer.java
@@ -17,19 +17,41 @@
 
 package org.apache.inlong.sort.hive;
 
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.hive.util.CacheHolder;
+import org.apache.inlong.sort.hive.util.HiveTableUtil;
+
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.filesystem.RowDataPartitionComputer;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 
 /**
  * A {@link RowDataPartitionComputer} that converts Flink objects to Hive objects before computing
@@ -37,16 +59,46 @@
  */
 public class HiveRowDataPartitionComputer extends RowDataPartitionComputer {
 
+    private static final Logger LOG = LoggerFactory.getLogger(HiveRowDataPartitionComputer.class);
+
     private final DataFormatConverters.DataFormatConverter[] partitionConverters;
     private final HiveObjectConversion[] hiveObjectConversions;
 
+    private transient JsonDynamicSchemaFormat jsonFormat;
+    private final boolean sinkMultipleEnable;
+    private final String sinkMultipleFormat;
+    private final String databasePattern;
+    private final String tablePattern;
+    private final HiveShim hiveShim;
+
+    private final String hiveVersion;
+    private final String inputFormat;
+    private final String outputFormat;
+    private final String serializationLib;
+
+    private final PartitionPolicy partitionPolicy;
+
+    private final String partitionField;
+
+    private final String timePattern;
+
     public HiveRowDataPartitionComputer(
+            JobConf jobConf,
             HiveShim hiveShim,
+            String hiveVersion,
             String defaultPartValue,
             String[] columnNames,
             DataType[] columnTypes,
-            String[] partitionColumns) {
+            String[] partitionColumns,
+            PartitionPolicy partitionPolicy,
+            String partitionField,
+            String timePattern,
+            String inputFormat,
+            String outputFormat,
+            String serializationLib) {
         super(defaultPartValue, columnNames, columnTypes, partitionColumns);
+        this.hiveShim = hiveShim;
+        this.hiveVersion = hiveVersion;
         this.partitionConverters =
                 Arrays.stream(partitionTypes)
                         .map(TypeConversions::fromLogicalToDataType)
@@ -60,20 +112,104 @@
                     HiveInspectors.getConversion(
                             objectInspector, partColType.getLogicalType(), hiveShim);
         }
+        this.sinkMultipleEnable = Boolean.parseBoolean(jobConf.get(SINK_MULTIPLE_ENABLE.key(), "false"));
+        this.sinkMultipleFormat = jobConf.get(SINK_MULTIPLE_FORMAT.key());
+        this.databasePattern = jobConf.get(SINK_MULTIPLE_DATABASE_PATTERN.key());
+        this.tablePattern = jobConf.get(SINK_MULTIPLE_TABLE_PATTERN.key());
+        this.partitionPolicy = partitionPolicy;
+        this.partitionField = partitionField;
+        this.timePattern = timePattern;
+        this.inputFormat = inputFormat;
+        this.outputFormat = outputFormat;
+        this.serializationLib = serializationLib;
     }
 
     @Override
     public LinkedHashMap<String, String> generatePartValues(RowData in) {
         LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
 
-        for (int i = 0; i < partitionIndexes.length; i++) {
-            Object field = partitionConverters[i].toExternal(in, partitionIndexes[i]);
-            String partitionValue =
-                    field != null ? hiveObjectConversions[i].toHiveObject(field).toString() : null;
-            if (StringUtils.isEmpty(partitionValue)) {
-                partitionValue = defaultPartValue;
+        if (sinkMultipleEnable) {
+            GenericRowData rowData = (GenericRowData) in;
+            JsonNode rootNode;
+            try {
+                if (jsonFormat == null) {
+                    jsonFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+                }
+                rootNode = jsonFormat.deserialize((byte[]) rowData.getField(0));
+
+                String databaseName = jsonFormat.parse(rootNode, databasePattern);
+                String tableName = jsonFormat.parse(rootNode, tablePattern);
+                List<Map<String, Object>> physicalDataList = HiveTableUtil.jsonNode2Map(
+                        jsonFormat.getPhysicalData(rootNode));
+
+                List<String> pkListStr = jsonFormat.extractPrimaryKeyNames(rootNode);
+                RowType schema = jsonFormat.extractSchema(rootNode, pkListStr);
+
+                Map<String, Object> rawData = physicalDataList.get(0);
+                ObjectIdentifier identifier = HiveTableUtil.createObjectIdentifier(databaseName, tableName);
+
+                HashMap<ObjectIdentifier, Long> ignoreWritingTableMap = CacheHolder.getIgnoreWritingTableMap();
+                // ignore writing data into this table
+                if (ignoreWritingTableMap.containsKey(identifier)) {
+                    return partSpec;
+                }
+
+                HiveWriterFactory hiveWriterFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+                if (hiveWriterFactory == null) {
+                    HiveTableUtil.createTable(databaseName, tableName, schema, partitionPolicy, hiveVersion,
+                            inputFormat, outputFormat, serializationLib);
+                    hiveWriterFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+                }
+
+                String[] partitionColumns = hiveWriterFactory.getPartitionColumns();
+                if (partitionColumns.length == 0) {
+                    // non partition table
+                    return partSpec;
+                }
+
+                String[] columnNames = hiveWriterFactory.getAllColumns();
+                DataType[] allTypes = hiveWriterFactory.getAllTypes();
+                List<String> columnList = Arrays.asList(columnNames);
+                int[] partitionIndexes = Arrays.stream(partitionColumns).mapToInt(columnList::indexOf).toArray();
+
+                boolean replaceLineBreak = hiveWriterFactory.getStorageDescriptor().getInputFormat()
+                        .contains("TextInputFormat");
+                Pair<GenericRowData, Integer> pair = HiveTableUtil.getRowData(rawData, columnNames, allTypes,
+                        replaceLineBreak);
+                GenericRowData genericRowData = pair.getLeft();
+
+                Object field;
+                for (int i = 0; i < partitionIndexes.length; i++) {
+                    int fieldIndex = partitionIndexes[i];
+                    field = genericRowData.getField(fieldIndex);
+                    DataType type = allTypes[fieldIndex];
+                    ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
+                    HiveObjectConversion hiveConversion = HiveInspectors.getConversion(objectInspector,
+                            type.getLogicalType(), hiveShim);
+                    String partitionValue = field != null ? String.valueOf(hiveConversion.toHiveObject(field)) : null;
+                    if (partitionValue == null) {
+                        field = HiveTableUtil.getDefaultPartitionValue(rawData, schema, partitionPolicy, partitionField,
+                                timePattern);
+                        partitionValue = field != null ? String.valueOf(hiveConversion.toHiveObject(field)) : null;
+                    }
+                    if (StringUtils.isEmpty(partitionValue)) {
+                        partitionValue = defaultPartValue;
+                    }
+                    partSpec.put(partitionColumns[i], partitionValue);
+                }
+            } catch (Exception e) {
+                // do not throw exception, just log it. so HadoopPathBasedPartFileWriter will archive dirty data or not
+                LOG.error("Generate partition values error", e);
             }
-            partSpec.put(partitionColumns[i], partitionValue);
+        } else {
+            for (int i = 0; i < partitionIndexes.length; i++) {
+                Object field = partitionConverters[i].toExternal(in, partitionIndexes[i]);
+                String partitionValue = field != null ? hiveObjectConversions[i].toHiveObject(field).toString() : null;
+                if (StringUtils.isEmpty(partitionValue)) {
+                    partitionValue = defaultPartValue;
+                }
+                partSpec.put(partitionColumns[i], partitionValue);
+            }
         }
         return partSpec;
     }
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java
index 9e433c6..3baef7f 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java
@@ -48,7 +48,7 @@
     private final String database;
     private final String tableName;
 
-    HiveTableMetaStoreFactory(JobConf conf, String hiveVersion, String database, String tableName) {
+    public HiveTableMetaStoreFactory(JobConf conf, String hiveVersion, String database, String tableName) {
         this.conf = new JobConfWrapper(conf);
         this.hiveVersion = hiveVersion;
         this.database = database;
@@ -60,7 +60,15 @@
         return new HiveTableMetaStore();
     }
 
-    private class HiveTableMetaStore implements TableMetaStore {
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public class HiveTableMetaStore implements TableMetaStore {
 
         private HiveMetastoreClientWrapper client;
         private StorageDescriptor sd;
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
index f3c235d..b9792d4 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
@@ -19,6 +19,9 @@
 
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.hive.filesystem.HadoopPathBasedBulkFormatBuilder;
 import org.apache.inlong.sort.hive.filesystem.StreamingSink;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -28,15 +31,11 @@
 import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
 import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.connectors.hive.util.JobConfUtils;
-import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
-import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
-import org.apache.flink.connectors.hive.write.HiveWriterFactory;
 import org.apache.flink.hive.shaded.formats.parquet.row.ParquetRowDataBuilder;
 import org.apache.flink.orc.OrcSplitReaderUtil;
 import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
 import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
@@ -75,9 +74,11 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.orc.TypeDescription;
@@ -127,6 +128,15 @@
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
 
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+    private final PartitionPolicy partitionPolicy;
+    private final String partitionField;
+    private final String timePattern;
+    private final boolean sinkMultipleEnable;
+    private final String inputFormat;
+    private final String outputFormat;
+    private final String serializationLib;
+
     public HiveTableSink(
             ReadableConfig flinkConf,
             JobConf jobConf,
@@ -136,7 +146,15 @@
             final String inlongMetric,
             final String auditHostAndPorts,
             DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink) {
+            @Nullable DirtySink<Object> dirtySink,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+            PartitionPolicy partitionPolicy,
+            String partitionField,
+            String timePattern,
+            boolean sinkMultipleEnable,
+            String inputFormat,
+            String outputFormat,
+            String serializationLib) {
         this.flinkConf = flinkConf;
         this.jobConf = jobConf;
         this.identifier = identifier;
@@ -152,6 +170,14 @@
         this.auditHostAndPorts = auditHostAndPorts;
         this.dirtyOptions = dirtyOptions;
         this.dirtySink = dirtySink;
+        this.schemaUpdatePolicy = schemaUpdatePolicy;
+        this.partitionPolicy = partitionPolicy;
+        this.partitionField = partitionField;
+        this.timePattern = timePattern;
+        this.sinkMultipleEnable = sinkMultipleEnable;
+        this.inputFormat = inputFormat;
+        this.outputFormat = outputFormat;
+        this.serializationLib = serializationLib;
     }
 
     @Override
@@ -172,23 +198,36 @@
         try (HiveMetastoreClientWrapper client =
                 HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
 
-            Table table = client.getTable(dbName, identifier.getObjectName());
-            StorageDescriptor sd = table.getSd();
-
-            Class hiveOutputFormatClz =
-                    hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
+            StorageDescriptor sd;
+            Properties tableProps = new Properties();
+            Class hiveOutputFormatClz;
             boolean isCompressed =
                     jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
-            HiveWriterFactory writerFactory =
-                    new HiveWriterFactory(
-                            jobConf,
-                            hiveOutputFormatClz,
-                            sd.getSerdeInfo(),
-                            tableSchema,
-                            getPartitionKeyArray(),
-                            HiveReflectionUtils.getTableMetadata(hiveShim, table),
-                            hiveShim,
-                            isCompressed);
+            if (sinkMultipleEnable) {
+                sd = new StorageDescriptor();
+                SerDeInfo serDeInfo = new SerDeInfo();
+                serDeInfo.setSerializationLib(this.serializationLib);
+                sd.setSerdeInfo(serDeInfo);
+                String defaultFs = jobConf.get("fs.defaultFS", "");
+                sd.setLocation(defaultFs + "/tmp");
+                hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(HiveIgnoreKeyTextOutputFormat.class);
+            } else {
+                Table table = client.getTable(dbName, identifier.getObjectName());
+                sd = table.getSd();
+                tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+                hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
+            }
+            HiveWriterFactory writerFactory = new HiveWriterFactory(
+                    jobConf,
+                    hiveOutputFormatClz,
+                    sd,
+                    tableSchema,
+                    getPartitionKeyArray(),
+                    tableProps,
+                    hiveShim,
+                    isCompressed,
+                    sinkMultipleEnable);
+
             String extension =
                     Utilities.getFileExtension(
                             jobConf,
@@ -204,14 +243,12 @@
                     Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
             if (isBounded) {
                 OutputFileConfig fileNaming = fileNamingBuilder.build();
-                return createBatchSink(
-                        dataStream, converter, sd, writerFactory, fileNaming, parallelism);
+                return createBatchSink(dataStream, converter, sd, writerFactory, fileNaming, parallelism);
             } else {
                 if (overwrite) {
                     throw new IllegalStateException("Streaming mode not support overwrite.");
                 }
 
-                Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
                 return createStreamSink(
                         dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
             }
@@ -278,13 +315,22 @@
                             identifier, FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
         }
 
-        HiveRowDataPartitionComputer partComputer =
-                new HiveRowDataPartitionComputer(
-                        hiveShim,
-                        JobConfUtils.getDefaultPartitionName(jobConf),
-                        tableSchema.getFieldNames(),
-                        tableSchema.getFieldDataTypes(),
-                        getPartitionKeyArray());
+        HiveRowDataPartitionComputer partComputer;
+        partComputer = new HiveRowDataPartitionComputer(
+                jobConf,
+                hiveShim,
+                hiveVersion,
+                JobConfUtils.getDefaultPartitionName(jobConf),
+                tableSchema.getFieldNames(),
+                tableSchema.getFieldDataTypes(),
+                getPartitionKeyArray(),
+                partitionPolicy,
+                partitionField,
+                timePattern,
+                inputFormat,
+                outputFormat,
+                serializationLib);
+
         TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
         HiveRollingPolicy rollingPolicy =
                 new HiveRollingPolicy(
@@ -387,9 +433,7 @@
         if (dbName == null) {
             dbName = identifier.getDatabaseName();
         }
-        LOG.info("11 ### dbName is " + dbName);
-        return new HiveTableMetaStoreFactory(
-                jobConf, hiveVersion, dbName, identifier.getObjectName());
+        return new HiveTableMetaStoreFactory(jobConf, hiveVersion, dbName, identifier.getObjectName());
     }
 
     private HadoopFileSystemFactory fsFactory() {
@@ -403,8 +447,9 @@
             HiveRollingPolicy rollingPolicy,
             OutputFileConfig outputFileConfig) {
         HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
-        return new HadoopPathBasedBulkFormatBuilder<>(
-                new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
+        return new HadoopPathBasedBulkFormatBuilder<>(new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner,
+                dirtyOptions, dirtySink, schemaUpdatePolicy, partitionPolicy, hiveShim, hiveVersion,
+                sinkMultipleEnable, inputFormat, outputFormat, serializationLib)
                         .withRollingPolicy(rollingPolicy)
                         .withOutputFileConfig(outputFileConfig);
     }
@@ -461,6 +506,10 @@
     }
 
     private List<String> getPartitionKeys() {
+        /*
+         * if (table != null) { return
+         * table.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); }
+         */
         return catalogTable.getPartitionKeys();
     }
 
@@ -507,7 +556,15 @@
                         inlongMetric,
                         auditHostAndPorts,
                         dirtyOptions,
-                        dirtySink);
+                        dirtySink,
+                        schemaUpdatePolicy,
+                        partitionPolicy,
+                        partitionField,
+                        timePattern,
+                        sinkMultipleEnable,
+                        inputFormat,
+                        outputFormat,
+                        serializationLib);
         sink.staticPartitionSpec = staticPartitionSpec;
         sink.overwrite = overwrite;
         sink.dynamicGrouping = dynamicGrouping;
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveWriterFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveWriterFactory.java
new file mode 100644
index 0000000..75b8e0d
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveWriterFactory.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive;
+
+import org.apache.flink.connectors.hive.CachedSerializedValue;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Function;
+
+/**
+ * Factory for creating {@link RecordWriter} and converters for writing.
+ */
+public class HiveWriterFactory implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Class hiveOutputFormatClz;
+
+    private final CachedSerializedValue<StorageDescriptor> storageDescriptor;
+
+    private final String[] allColumns;
+
+    private final DataType[] allTypes;
+
+    private final String[] partitionColumns;
+
+    private final Properties tableProperties;
+
+    private final JobConfWrapper confWrapper;
+
+    private final HiveShim hiveShim;
+
+    private final boolean isCompressed;
+
+    // SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common
+    // base class
+    private transient Serializer recordSerDe;
+
+    /**
+     * Field number excluding partition fields.
+     */
+    private transient int formatFields;
+
+    // to convert Flink object to Hive object
+    private transient HiveObjectConversion[] hiveConversions;
+
+    private transient DataFormatConverter[] converters;
+
+    // StructObjectInspector represents the hive row structure.
+    private transient StructObjectInspector formatInspector;
+
+    private transient boolean initialized;
+
+    private final boolean sinkMultipleEnable;
+
+    public HiveWriterFactory(JobConf jobConf,
+            Class hiveOutputFormatClz,
+            StorageDescriptor storageDescriptor,
+            TableSchema schema,
+            String[] partitionColumns,
+            Properties tableProperties,
+            HiveShim hiveShim,
+            boolean isCompressed,
+            boolean sinkMultipleEnable) {
+        Preconditions.checkArgument(HiveOutputFormat.class.isAssignableFrom(hiveOutputFormatClz),
+                "The output format should be an instance of HiveOutputFormat");
+        this.confWrapper = new JobConfWrapper(jobConf);
+        this.hiveOutputFormatClz = hiveOutputFormatClz;
+        try {
+            this.storageDescriptor = new CachedSerializedValue<>(storageDescriptor);
+        } catch (IOException e) {
+            throw new FlinkHiveException("Failed to serialize SerDeInfo", e);
+        }
+        this.allColumns = schema.getFieldNames();
+        this.allTypes = schema.getFieldDataTypes();
+        this.partitionColumns = partitionColumns;
+        this.tableProperties = tableProperties;
+        this.hiveShim = hiveShim;
+        this.isCompressed = isCompressed;
+        this.sinkMultipleEnable = sinkMultipleEnable;
+    }
+
+    /**
+     * Create a {@link RecordWriter} from path.
+     */
+    public RecordWriter createRecordWriter(Path path) {
+        try {
+            checkInitialize();
+            JobConf conf = new JobConf(confWrapper.conf());
+
+            if (isCompressed) {
+                String codecStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC.varname);
+                if (!StringUtils.isNullOrWhitespaceOnly(codecStr)) {
+                    // noinspection unchecked
+                    Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(
+                            codecStr, true, Thread.currentThread().getContextClassLoader());
+                    FileOutputFormat.setOutputCompressorClass(conf, codec);
+                }
+                String typeStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE.varname);
+                if (!StringUtils.isNullOrWhitespaceOnly(typeStr)) {
+                    SequenceFile.CompressionType style = SequenceFile.CompressionType.valueOf(typeStr);
+                    SequenceFileOutputFormat.setOutputCompressionType(conf, style);
+                }
+            }
+
+            return hiveShim.getHiveRecordWriter(conf, hiveOutputFormatClz, recordSerDe.getSerializedClass(),
+                    isCompressed, tableProperties, path);
+        } catch (Exception e) {
+            throw new FlinkHiveException(e);
+        }
+    }
+
+    public JobConf getJobConf() {
+        return confWrapper.conf();
+    }
+
+    public StorageDescriptor getStorageDescriptor() throws IOException, ClassNotFoundException {
+        return storageDescriptor.deserializeValue();
+    }
+
+    public String[] getAllColumns() {
+        return allColumns;
+    }
+
+    public DataType[] getAllTypes() {
+        return allTypes;
+    }
+
+    public String[] getPartitionColumns() {
+        return partitionColumns;
+    }
+
+    public void checkInitialize() throws Exception {
+        if (initialized) {
+            return;
+        }
+
+        JobConf jobConf = confWrapper.conf();
+        Object serdeLib = Class.forName(getStorageDescriptor().getSerdeInfo().getSerializationLib()).newInstance();
+        Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
+                "Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+                        + serdeLib.getClass().getName());
+        this.recordSerDe = (Serializer) serdeLib;
+        ReflectionUtils.setConf(recordSerDe, jobConf);
+
+        // TODO: support partition properties, for now assume they're same as table properties
+        SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
+
+        this.formatFields = allColumns.length - partitionColumns.length;
+        this.hiveConversions = new HiveObjectConversion[formatFields];
+        this.converters = new DataFormatConverter[formatFields];
+        List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
+        for (int i = 0; i < formatFields; i++) {
+            DataType type = allTypes[i];
+            ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
+            objectInspectors.add(objectInspector);
+            hiveConversions[i] = HiveInspectors.getConversion(objectInspector, type.getLogicalType(), hiveShim);
+            converters[i] = DataFormatConverters.getConverterForDataType(type);
+        }
+
+        this.formatInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+                Arrays.asList(allColumns).subList(0, formatFields), objectInspectors);
+        this.initialized = true;
+    }
+
+    public Function<Row, Writable> createRowConverter() {
+        return row -> {
+            List<Object> fields = new ArrayList<>(formatFields);
+            for (int i = 0; i < formatFields; i++) {
+                fields.add(hiveConversions[i].toHiveObject(row.getField(i)));
+            }
+            return serialize(fields);
+        };
+    }
+
+    public Function<RowData, Writable> createRowDataConverter() {
+        return row -> {
+            List<Object> fields = new ArrayList<>(formatFields);
+            for (int i = 0; i < formatFields; i++) {
+                if (sinkMultipleEnable) {
+                    GenericRowData rowData = (GenericRowData) row;
+                    fields.add(hiveConversions[i].toHiveObject(rowData.getField(i)));
+                } else {
+                    fields.add(hiveConversions[i].toHiveObject(converters[i].toExternal(row, i)));
+                }
+            }
+            return serialize(fields);
+        };
+    }
+
+    private Writable serialize(List<Object> fields) {
+        try {
+            return recordSerDe.serialize(fields, formatInspector);
+        } catch (SerDeException e) {
+            throw new FlinkHiveException(e);
+        }
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index c3b408c..ecae4e23 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -17,15 +17,12 @@
 
 package org.apache.inlong.sort.hive.filesystem;
 
-import org.apache.inlong.sort.base.dirty.DirtyData;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
-import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import org.apache.flink.api.common.state.ListState;
@@ -52,8 +49,6 @@
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
@@ -83,7 +78,6 @@
     private @Nullable final String auditHostAndPorts;
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
-
     // --------------------------- runtime fields -----------------------------
 
     private transient Buckets<IN, String> buckets;
@@ -93,13 +87,10 @@
     private transient long currentWatermark;
 
     @Nullable
-    private transient SinkMetricData metricData;
+    private transient SinkTableMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
 
-    private Long dataSize = 0L;
-    private Long rowSize = 0L;
-
     public AbstractStreamingWriter(
             long bucketCheckInterval,
             StreamingFileSink.BucketsBuilder<IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder,
@@ -141,11 +132,6 @@
     protected void commitUpToCheckpoint(long checkpointId) throws Exception {
         try {
             helper.commitUpToCheckpoint(checkpointId);
-            if (metricData != null) {
-                metricData.invoke(rowSize, dataSize);
-            }
-            rowSize = 0L;
-            dataSize = 0L;
         } catch (Exception e) {
             LOG.error("hive sink commitUpToCheckpoint occurs error.", e);
             throw e;
@@ -155,18 +141,6 @@
     @Override
     public void open() throws Exception {
         super.open();
-        MetricOption metricOption = MetricOption.builder()
-                .withInlongLabels(inlongMetric)
-                .withAuditAddress(auditHostAndPorts)
-                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
-                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
-                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
-                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
-                .withRegisterMetric(RegisteredMetric.ALL)
-                .build();
-        if (metricOption != null) {
-            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
-        }
         if (dirtySink != null) {
             dirtySink.open(new Configuration());
         }
@@ -175,6 +149,39 @@
     @Override
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
+
+        // init metric state
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                            })));
+        }
+
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+
+        }
+
+        MetricOption metricOption = MetricOption.builder().withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL).build();
+
+        if (metricOption != null) {
+            metricData = new SinkTableMetricData(metricOption, getRuntimeContext().getMetricGroup());
+            metricData.registerSubMetricsGroup(metricState);
+            if (this.bucketsBuilder instanceof HadoopPathBasedBulkFormatBuilder) {
+                ((HadoopPathBasedBulkFormatBuilder) this.bucketsBuilder).setMetricData(metricData);
+            }
+        }
+
+        // metricData must be initialized first, then HadoopPathBasedPartFileWriter can use this metricData to upload
+        // metric data.
         buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
 
         // Set listener before the initialization of Buckets.
@@ -203,18 +210,6 @@
                         bucketCheckInterval);
 
         currentWatermark = Long.MIN_VALUE;
-
-        // init metric state
-        if (this.inlongMetric != null) {
-            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
-                    new ListStateDescriptor<>(
-                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
-                            })));
-        }
-        if (context.isRestored()) {
-            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
-                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
-        }
     }
 
     @Override
@@ -235,44 +230,11 @@
 
     @Override
     public void processElement(StreamRecord<IN> element) throws Exception {
-        try {
-            helper.onElement(
-                    element.getValue(),
-                    getProcessingTimeService().getCurrentProcessingTime(),
-                    element.hasTimestamp() ? element.getTimestamp() : null,
-                    currentWatermark);
-            rowSize = rowSize + 1;
-            if (element.getValue() != null) {
-                dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(element.getValue());
-            }
-        } catch (IOException e) {
-            throw e;
-        } catch (Exception e) {
-            LOGGER.error("StreamingWriter write failed", e);
-            if (!dirtyOptions.ignoreDirty()) {
-                throw new RuntimeException(e);
-            }
-            if (metricData != null) {
-                metricData.invokeDirtyWithEstimate(element.getValue());
-            }
-            if (dirtySink != null) {
-                DirtyData.Builder<Object> builder = DirtyData.builder();
-                try {
-                    builder.setData(element.getValue())
-                            .setDirtyType(DirtyType.UNDEFINED)
-                            .setLabels(dirtyOptions.getLabels())
-                            .setLogTag(dirtyOptions.getLogTag())
-                            .setDirtyMessage(e.getMessage())
-                            .setIdentifier(dirtyOptions.getIdentifier());
-                    dirtySink.invoke(builder.build());
-                } catch (Exception ex) {
-                    if (!dirtyOptions.ignoreSideOutputErrors()) {
-                        throw new RuntimeException(ex);
-                    }
-                    LOGGER.warn("Dirty sink failed", ex);
-                }
-            }
-        }
+        helper.onElement(
+                element.getValue(),
+                getProcessingTimeService().getCurrentProcessingTime(),
+                element.hasTimestamp() ? element.getTimestamp() : null,
+                currentWatermark);
     }
 
     @Override
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/DefaultHadoopFileCommitterFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/DefaultHadoopFileCommitterFactory.java
new file mode 100644
index 0000000..644b9a1
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/DefaultHadoopFileCommitterFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * The default hadoop file committer factory which always use {@link HadoopRenameFileCommitter}.
+ */
+public class DefaultHadoopFileCommitterFactory implements HadoopFileCommitterFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final boolean sinkMultipleEnable;
+
+    public DefaultHadoopFileCommitterFactory(boolean sinkMultipleEnable) {
+        this.sinkMultipleEnable = sinkMultipleEnable;
+    }
+
+    @Override
+    public HadoopFileCommitter create(Configuration configuration, Path targetFilePath) throws IOException {
+        return new HadoopRenameFileCommitter(configuration, targetFilePath, sinkMultipleEnable);
+    }
+
+    @Override
+    public HadoopFileCommitter recoverForCommit(Configuration configuration, Path targetFilePath, Path tempFilePath) {
+        return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath, sinkMultipleEnable);
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedBulkFormatBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedBulkFormatBuilder.java
new file mode 100644
index 0000000..9fb3791
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedBulkFormatBuilder.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
+import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.SerializableConfiguration;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** Buckets builder to create buckets that use {@link HadoopPathBasedPartFileWriter}. */
+public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPathBasedBulkFormatBuilder<IN, BucketID, T>>
+        extends
+            StreamingFileSink.BucketsBuilder<IN, BucketID, T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Path basePath;
+
+    private final HadoopPathBasedBulkWriter.Factory<IN> writerFactory;
+
+    private final HadoopFileCommitterFactory fileCommitterFactory;
+
+    private SerializableConfiguration serializableConfiguration;
+
+    private BucketAssigner<IN, BucketID> bucketAssigner;
+
+    private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
+
+    private BucketFactory<IN, BucketID> bucketFactory;
+
+    private OutputFileConfig outputFileConfig;
+
+    @Nullable
+    private transient SinkTableMetricData metricData;
+
+    private final DirtyOptions dirtyOptions;
+
+    private final @Nullable DirtySink<Object> dirtySink;
+
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+    private final PartitionPolicy partitionPolicy;
+    private final HiveShim hiveShim;
+    private final String hiveVersion;
+    private final String inputFormat;
+    private final String outputFormat;
+    private final String serializationLib;
+
+    public HadoopPathBasedBulkFormatBuilder(
+            org.apache.hadoop.fs.Path basePath,
+            HadoopPathBasedBulkWriter.Factory<IN> writerFactory,
+            Configuration configuration,
+            BucketAssigner<IN, BucketID> assigner,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+            PartitionPolicy partitionPolicy,
+            HiveShim hiveShim,
+            String hiveVersion,
+            boolean sinkMultipleEnable,
+            String inputFormat,
+            String outputFormat,
+            String serializationLib) {
+        this(
+                basePath,
+                writerFactory,
+                new DefaultHadoopFileCommitterFactory(sinkMultipleEnable),
+                configuration,
+                assigner,
+                OnCheckpointRollingPolicy.build(),
+                new DefaultBucketFactoryImpl<>(),
+                OutputFileConfig.builder().build(),
+                dirtyOptions,
+                dirtySink,
+                schemaUpdatePolicy,
+                partitionPolicy,
+                hiveShim,
+                hiveVersion,
+                inputFormat,
+                outputFormat,
+                serializationLib);
+    }
+
+    public HadoopPathBasedBulkFormatBuilder(
+            org.apache.hadoop.fs.Path basePath,
+            HadoopPathBasedBulkWriter.Factory<IN> writerFactory,
+            HadoopFileCommitterFactory fileCommitterFactory,
+            Configuration configuration,
+            BucketAssigner<IN, BucketID> assigner,
+            CheckpointRollingPolicy<IN, BucketID> policy,
+            BucketFactory<IN, BucketID> bucketFactory,
+            OutputFileConfig outputFileConfig,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+            PartitionPolicy partitionPolicy,
+            HiveShim hiveShim,
+            String hiveVersion,
+            String inputFormat,
+            String outputFormat,
+            String serializationLib) {
+
+        this.basePath = new Path(Preconditions.checkNotNull(basePath).toString());
+        this.writerFactory = writerFactory;
+        this.fileCommitterFactory = fileCommitterFactory;
+        this.serializableConfiguration = new SerializableConfiguration(configuration);
+        this.bucketAssigner = Preconditions.checkNotNull(assigner);
+        this.rollingPolicy = Preconditions.checkNotNull(policy);
+        this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+        this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
+        this.schemaUpdatePolicy = schemaUpdatePolicy;
+        this.partitionPolicy = partitionPolicy;
+        this.hiveShim = hiveShim;
+        this.hiveVersion = hiveVersion;
+        this.inputFormat = inputFormat;
+        this.outputFormat = outputFormat;
+        this.serializationLib = serializationLib;
+    }
+
+    public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
+        this.bucketAssigner = Preconditions.checkNotNull(assigner);
+        return self();
+    }
+
+    public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) {
+        this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+        return self();
+    }
+
+    public T withBucketFactory(BucketFactory<IN, BucketID> factory) {
+        this.bucketFactory = Preconditions.checkNotNull(factory);
+        return self();
+    }
+
+    public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
+        this.outputFileConfig = outputFileConfig;
+        return self();
+    }
+
+    public T withConfiguration(Configuration configuration) {
+        this.serializableConfiguration = new SerializableConfiguration(configuration);
+        return self();
+    }
+
+    @Override
+    public BucketWriter<IN, BucketID> createBucketWriter() {
+        return new HadoopPathBasedPartFileWriter.HadoopPathBasedBucketWriter<>(
+                serializableConfiguration.getConfiguration(),
+                writerFactory,
+                fileCommitterFactory,
+                metricData,
+                dirtyOptions,
+                dirtySink,
+                schemaUpdatePolicy,
+                partitionPolicy,
+                hiveShim,
+                hiveVersion,
+                inputFormat,
+                outputFormat,
+                serializationLib);
+    }
+
+    @Override
+    public Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
+        return new Buckets<>(
+                basePath,
+                bucketAssigner,
+                bucketFactory,
+                createBucketWriter(),
+                rollingPolicy,
+                subtaskIndex,
+                outputFileConfig);
+    }
+
+    public void setMetricData(@Nullable SinkTableMetricData metricData) {
+        this.metricData = metricData;
+    }
+
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedPartFileWriter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedPartFileWriter.java
new file mode 100644
index 0000000..de1d87c
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedPartFileWriter.java
@@ -0,0 +1,816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.hive.HiveBulkWriterFactory;
+import org.apache.inlong.sort.hive.HiveWriterFactory;
+import org.apache.inlong.sort.hive.util.CacheHolder;
+import org.apache.inlong.sort.hive.util.HiveTableUtil;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.hive.HiveOptions.HIVE_SCHEMA_SCAN_INTERVAL;
+
+/**
+ * The part-file writer that writes to the specified hadoop path.
+ */
+public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopPathBasedPartFileWriter.class);
+
+    private final InLongHadoopPathBasedBulkWriter writer;
+
+    private final HadoopFileCommitter fileCommitter;
+
+    private BucketID bucketID;
+    private Path targetPath;
+    private Path inProgressPath;
+    private final HiveShim hiveShim;
+    private final String hiveVersion;
+
+    private final boolean sinkMultipleEnable;
+    private final String sinkMultipleFormat;
+
+    private final String databasePattern;
+    private final String tablePattern;
+
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+
+    private final PartitionPolicy partitionPolicy;
+
+    private final String inputFormat;
+
+    private final String outputFormat;
+
+    private final String serializationLib;
+
+    private transient JsonDynamicSchemaFormat jsonFormat;
+
+    @Nullable
+    private final transient SinkTableMetricData metricData;
+
+    private final DirtyOptions dirtyOptions;
+
+    private final @Nullable DirtySink<Object> dirtySink;
+
+    public HadoopPathBasedPartFileWriter(final BucketID bucketID,
+            Path targetPath,
+            Path inProgressPath,
+            InLongHadoopPathBasedBulkWriter writer,
+            HadoopFileCommitter fileCommitter,
+            long createTime,
+            HiveShim hiveShim,
+            String hiveVersion,
+            boolean sinkMultipleEnable,
+            String sinkMultipleFormat,
+            String databasePattern,
+            String tablePattern,
+            @Nullable SinkTableMetricData metricData,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+            PartitionPolicy partitionPolicy,
+            String inputFormat,
+            String outputFormat,
+            String serializationLib) {
+        super(bucketID, createTime);
+
+        this.bucketID = bucketID;
+        this.targetPath = targetPath;
+        this.inProgressPath = inProgressPath;
+        this.writer = writer;
+        this.fileCommitter = fileCommitter;
+        this.hiveShim = hiveShim;
+        this.hiveVersion = hiveVersion;
+        this.sinkMultipleEnable = sinkMultipleEnable;
+        this.sinkMultipleFormat = sinkMultipleFormat;
+        this.databasePattern = databasePattern;
+        this.tablePattern = tablePattern;
+        this.metricData = metricData;
+        this.schemaUpdatePolicy = schemaUpdatePolicy;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
+        this.partitionPolicy = partitionPolicy;
+        this.inputFormat = inputFormat;
+        this.outputFormat = outputFormat;
+        this.serializationLib = serializationLib;
+    }
+
+    @Override
+    public void write(IN element, long currentTime) {
+        String databaseName = null;
+        String tableName = null;
+        int recordNum = 1;
+        int recordSize = 0;
+        JsonNode rootNode = null;
+        ObjectIdentifier identifier = null;
+        HashMap<ObjectIdentifier, Long> ignoreWritingTableMap = CacheHolder.getIgnoreWritingTableMap();
+        try {
+            if (sinkMultipleEnable) {
+                GenericRowData rowData = (GenericRowData) element;
+                if (jsonFormat == null) {
+                    jsonFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+                }
+                byte[] rawData = (byte[]) rowData.getField(0);
+                rootNode = jsonFormat.deserialize(rawData);
+                LOG.debug("root node: {}", rootNode);
+                boolean isDDL = jsonFormat.extractDDLFlag(rootNode);
+                if (isDDL) {
+                    // Ignore ddl change for now
+                    return;
+                }
+                databaseName = jsonFormat.parse(rootNode, databasePattern);
+                tableName = jsonFormat.parse(rootNode, tablePattern);
+
+                List<Map<String, Object>> physicalDataList = HiveTableUtil.jsonNode2Map(
+                        jsonFormat.getPhysicalData(rootNode));
+                recordNum = physicalDataList.size();
+
+                identifier = HiveTableUtil.createObjectIdentifier(databaseName, tableName);
+
+                // ignore writing data into this table
+                if (ignoreWritingTableMap.containsKey(identifier)) {
+                    return;
+                }
+
+                List<String> pkListStr = jsonFormat.extractPrimaryKeyNames(rootNode);
+                RowType schema = jsonFormat.extractSchema(rootNode, pkListStr);
+                HiveWriterFactory writerFactory = getHiveWriterFactory(identifier, schema, hiveVersion);
+
+                // parse the real location of hive table
+                Path inProgressFilePath = getInProgressPath(writerFactory);
+
+                LOG.debug("in progress file path: {}", inProgressFilePath);
+                Pair<RecordWriter, Function<RowData, Writable>> pair = getRecordWriterAndRowConverter(writerFactory,
+                        inProgressFilePath);
+                // reset record writer and row converter
+                writer.setRecordWriter(pair.getLeft());
+                writer.setRowConverter(pair.getRight());
+                writer.setInProgressPath(inProgressFilePath);
+
+                boolean replaceLineBreak = writerFactory.getStorageDescriptor().getInputFormat()
+                        .contains("TextInputFormat");
+
+                for (Map<String, Object> record : physicalDataList) {
+                    // check and alter hive table if schema has changed
+                    boolean changed = checkSchema(identifier, writerFactory, schema);
+                    if (changed) {
+                        // remove cache and reload hive writer factory
+                        CacheHolder.getFactoryMap().remove(identifier);
+                        writerFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+                        assert writerFactory != null;
+                        FileSinkOperator.RecordWriter recordWriter = writerFactory.createRecordWriter(
+                                inProgressFilePath);
+                        Function<RowData, Writable> rowConverter = writerFactory.createRowDataConverter();
+                        writer.setRecordWriter(recordWriter);
+                        writer.setRowConverter(rowConverter);
+                        CacheHolder.getRecordWriterHashMap().put(inProgressFilePath, recordWriter);
+                        CacheHolder.getRowConverterHashMap().put(inProgressFilePath, rowConverter);
+                    }
+
+                    LOG.debug("record: {}", record);
+                    LOG.debug("columns : {}", Arrays.deepToString(writerFactory.getAllColumns()));
+                    LOG.debug("types: {}", Arrays.deepToString(writerFactory.getAllTypes()));
+                    Pair<GenericRowData, Integer> rowDataPair = HiveTableUtil.getRowData(record,
+                            writerFactory.getAllColumns(), writerFactory.getAllTypes(), replaceLineBreak);
+                    GenericRowData genericRowData = rowDataPair.getLeft();
+                    recordSize += rowDataPair.getRight();
+                    LOG.debug("generic row data: {}", genericRowData);
+                    writer.addElement(genericRowData);
+                }
+                if (metricData != null) {
+                    metricData.outputMetrics(databaseName, tableName, recordNum, recordSize);
+                }
+            } else {
+                RowData data = (RowData) element;
+                writer.addElement(data);
+                if (metricData != null) {
+                    if (data instanceof BinaryRowData) {
+                        // mysql cdc sends BinaryRowData
+                        metricData.invoke(1, ((BinaryRowData) data).getSizeInBytes());
+                    } else {
+                        // oracle cdc sends GenericRowData
+                        metricData.invoke(1, data.toString().getBytes(StandardCharsets.UTF_8).length);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            if (schemaUpdatePolicy == null || SchemaUpdateExceptionPolicy.THROW_WITH_STOP == schemaUpdatePolicy) {
+                throw new FlinkHiveException("Failed to write data", e);
+            } else if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == schemaUpdatePolicy) {
+                if (identifier != null) {
+                    ignoreWritingTableMap.put(identifier, 1L);
+                }
+            } else if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == schemaUpdatePolicy) {
+                handleDirtyData(databaseName, tableName, recordNum, recordSize, rootNode, jsonFormat, e);
+            }
+            LOG.error("Failed to write data", e);
+        }
+        markWrite(currentTime);
+    }
+
+    /**
+     * upload dirty data metrics and write dirty data
+     *
+     * @param databaseName database name
+     * @param tableName table name
+     * @param recordNum record num
+     * @param recordSize record byte size
+     * @param data raw data
+     * @param jsonFormat json formatter for formatting raw data
+     * @param e exception
+     */
+    private void handleDirtyData(String databaseName,
+            String tableName,
+            int recordNum,
+            int recordSize,
+            JsonNode data,
+            JsonDynamicSchemaFormat jsonFormat,
+            Exception e) {
+        // upload metrics for dirty data
+        if (null != metricData) {
+            if (sinkMultipleEnable) {
+                metricData.outputDirtyMetrics(databaseName, tableName, recordNum, recordSize);
+            } else {
+                metricData.invokeDirty(recordNum, recordSize);
+            }
+        }
+
+        if (!dirtyOptions.ignoreDirty()) {
+            return;
+        }
+        if (data == null || jsonFormat == null) {
+            return;
+        }
+        Triple<String, String, String> triple = getDirtyLabelTagAndIdentity(data, jsonFormat);
+        String label = triple.getLeft();
+        String tag = triple.getMiddle();
+        String identify = triple.getRight();
+        if (label == null || tag == null || identify == null) {
+            LOG.warn("dirty label or tag or identify is null, ignore dirty data writing");
+            return;
+        }
+        // archive dirty data
+        DirtySinkHelper<Object> dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+        List<Map<String, Object>> physicalDataList = HiveTableUtil.jsonNode2Map(jsonFormat.getPhysicalData(data));
+        for (Map<String, Object> record : physicalDataList) {
+            JsonNode jsonNode = HiveTableUtil.object2JsonNode(record);
+            dirtySinkHelper.invoke(jsonNode, DirtyType.BATCH_LOAD_ERROR, label, tag, identify, e);
+        }
+    }
+
+    /**
+     * parse dirty label , tag and identify
+     *
+     * @param data raw data
+     * @param jsonFormat json formatter
+     * @return dirty label, tag and identify
+     */
+    private Triple<String, String, String> getDirtyLabelTagAndIdentity(JsonNode data,
+            JsonDynamicSchemaFormat jsonFormat) {
+        String dirtyLabel = null;
+        String dirtyLogTag = null;
+        String dirtyIdentify = null;
+        try {
+            if (dirtyOptions.ignoreDirty()) {
+                if (dirtyOptions.getLabels() != null) {
+                    dirtyLabel = jsonFormat.parse(data,
+                            DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), DirtyType.BATCH_LOAD_ERROR, null));
+                }
+                if (dirtyOptions.getLogTag() != null) {
+                    dirtyLogTag = jsonFormat.parse(data,
+                            DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), DirtyType.BATCH_LOAD_ERROR, null));
+                }
+                if (dirtyOptions.getIdentifier() != null) {
+                    dirtyIdentify = jsonFormat.parse(data,
+                            DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), DirtyType.BATCH_LOAD_ERROR,
+                                    null));
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Parse dirty options failed. {}", ExceptionUtils.stringifyException(e));
+        }
+        return new ImmutableTriple<>(dirtyLabel, dirtyLogTag, dirtyIdentify);
+    }
+
+    /**
+     * get hive writer factory, create table if not exists automatically
+     *
+     * @param identifier hive database and table name
+     * @param schema hive field with flink type
+     * @return hive writer factory
+     */
+    private HiveWriterFactory getHiveWriterFactory(ObjectIdentifier identifier, RowType schema, String hiveVersion) {
+        HiveWriterFactory writerFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+        if (writerFactory == null) {
+            // hive table may not exist, auto create
+            HiveTableUtil.createTable(identifier.getDatabaseName(), identifier.getObjectName(), schema, partitionPolicy,
+                    hiveVersion, inputFormat, outputFormat, serializationLib);
+            writerFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+        }
+        return writerFactory;
+    }
+
+    /**
+     * get target hdfs path and temp hdfs path for data writing
+     *
+     * @param writerFactory hive writer factory
+     * @return pair of target path and temp path
+     */
+    private Path getInProgressPath(HiveWriterFactory writerFactory) throws IOException, ClassNotFoundException {
+        String location = writerFactory.getStorageDescriptor().getLocation();
+        String path = targetPath.toUri().getPath();
+        path = path.substring(path.indexOf("/tmp/") + 5);
+        Path targetPath = new Path(location + "/" + path);
+        return new Path(targetPath.getParent() + "/" + inProgressPath.getName());
+    }
+
+    /**
+     * get hive record writer and row converter
+     *
+     * @param writerFactory hive writer factory
+     * @param inProgressFilePath temp hdfs file path for writing data
+     * @return pair of hive record writer and row converter objects
+     */
+    private Pair<RecordWriter, Function<RowData, Writable>> getRecordWriterAndRowConverter(
+            HiveWriterFactory writerFactory, Path inProgressFilePath) {
+        FileSinkOperator.RecordWriter recordWriter;
+        Function<RowData, Writable> rowConverter;
+        if (!CacheHolder.getRecordWriterHashMap().containsKey(inProgressFilePath)) {
+            recordWriter = writerFactory.createRecordWriter(inProgressFilePath);
+            rowConverter = writerFactory.createRowDataConverter();
+            CacheHolder.getRecordWriterHashMap().put(inProgressFilePath, recordWriter);
+            CacheHolder.getRowConverterHashMap().put(inProgressFilePath, rowConverter);
+        } else {
+            recordWriter = CacheHolder.getRecordWriterHashMap().get(inProgressFilePath);
+            rowConverter = CacheHolder.getRowConverterHashMap().get(inProgressFilePath);
+        }
+        return new ImmutablePair<>(recordWriter, rowConverter);
+    }
+
+    /**
+     * check if source table schema changes
+     *
+     * @param identifier hive database name and table name
+     * @param writerFactory hive writer factory
+     * @param schema hive field with flink types
+     * @return if schema has changed
+     */
+    private boolean checkSchema(ObjectIdentifier identifier, HiveWriterFactory writerFactory, RowType schema) {
+        HashMap<ObjectIdentifier, Long> schemaCheckTimeMap = CacheHolder.getSchemaCheckTimeMap();
+        long lastUpdate = schemaCheckTimeMap.getOrDefault(identifier, -1L);
+        // handle the schema every `HIVE_SCHEMA_SCAN_INTERVAL` milliseconds
+        int scanSchemaInterval = Integer.parseInt(writerFactory.getJobConf()
+                .get(HIVE_SCHEMA_SCAN_INTERVAL.key(), HIVE_SCHEMA_SCAN_INTERVAL.defaultValue() + ""));
+        boolean changed = false;
+        if (System.currentTimeMillis() - lastUpdate >= scanSchemaInterval) {
+            changed = HiveTableUtil.changeSchema(schema, writerFactory.getAllColumns(),
+                    writerFactory.getAllTypes(), identifier.getDatabaseName(), identifier.getObjectName(), hiveVersion);
+            schemaCheckTimeMap.put(identifier, System.currentTimeMillis());
+        }
+        return changed;
+    }
+
+    @Override
+    public InProgressFileRecoverable persist() {
+        throw new UnsupportedOperationException("The path based writers do not support persisting");
+    }
+
+    @Override
+    public PendingFileRecoverable closeForCommit() throws IOException {
+        if (sinkMultipleEnable) {
+            LOG.info("record writer cache {}", CacheHolder.getRecordWriterHashMap());
+            Iterator<Path> iterator = CacheHolder.getRecordWriterHashMap().keySet().iterator();
+            while (iterator.hasNext()) {
+                Path inProgressFilePath = iterator.next();
+                // one flink batch writes many hive tables, they are the same inProgressPath
+                if (inProgressFilePath.getName().equals(this.inProgressPath.getName())) {
+                    FileSinkOperator.RecordWriter recordWriter = CacheHolder.getRecordWriterHashMap()
+                            .get(inProgressFilePath);
+                    writer.setRecordWriter(recordWriter);
+                    writer.flush();
+                    writer.finish();
+                    // clear cache
+                    iterator.remove();
+                    CacheHolder.getRowConverterHashMap().remove(inProgressFilePath);
+
+                    // parse the target location of hive table
+                    String tmpFileName = inProgressFilePath.getName();
+                    String targetPathName = tmpFileName.substring(1, tmpFileName.lastIndexOf(".inprogress"));
+                    Path targetPath = new Path(inProgressFilePath.getParent() + "/" + targetPathName);
+
+                    LOG.info("file committer target path {}, in progress file {}", targetPath, inProgressFilePath);
+                    HadoopRenameFileCommitter committer = new HadoopRenameFileCommitter(
+                            ((HadoopRenameFileCommitter) fileCommitter).getConfiguration(),
+                            targetPath,
+                            inProgressFilePath,
+                            true);
+                    CacheHolder.getFileCommitterHashMap().put(inProgressFilePath, committer);
+                }
+            }
+        } else {
+            writer.flush();
+            writer.finish();
+            fileCommitter.preCommit();
+        }
+        return new HadoopPathBasedPendingFile(fileCommitter, getSize()).getRecoverable();
+    }
+
+    @Override
+    public void dispose() {
+        writer.dispose();
+    }
+
+    @Override
+    public long getSize() throws IOException {
+        return writer.getSize();
+    }
+
+    static class HadoopPathBasedPendingFile implements BucketWriter.PendingFile {
+
+        private final HadoopFileCommitter fileCommitter;
+
+        private final long fileSize;
+
+        public HadoopPathBasedPendingFile(HadoopFileCommitter fileCommitter, long fileSize) {
+            this.fileCommitter = fileCommitter;
+            this.fileSize = fileSize;
+        }
+
+        @Override
+        public void commit() throws IOException {
+            fileCommitter.commit();
+        }
+
+        @Override
+        public void commitAfterRecovery() throws IOException {
+            fileCommitter.commitAfterRecovery();
+        }
+
+        public PendingFileRecoverable getRecoverable() {
+            return new HadoopPathBasedPendingFileRecoverable(fileCommitter.getTargetFilePath(),
+                    fileCommitter.getTempFilePath(), fileSize);
+        }
+    }
+
+    @VisibleForTesting
+    static class HadoopPathBasedPendingFileRecoverable implements PendingFileRecoverable {
+
+        private final Path targetFilePath;
+
+        private final Path tempFilePath;
+
+        private final long fileSize;
+
+        @Deprecated
+        // Remained for compatibility
+        public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path tempFilePath) {
+            this.targetFilePath = targetFilePath;
+            this.tempFilePath = tempFilePath;
+            this.fileSize = -1L;
+        }
+
+        public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path tempFilePath, long fileSize) {
+            this.targetFilePath = targetFilePath;
+            this.tempFilePath = tempFilePath;
+            this.fileSize = fileSize;
+        }
+
+        public Path getTargetFilePath() {
+            return targetFilePath;
+        }
+
+        public Path getTempFilePath() {
+            return tempFilePath;
+        }
+
+        public org.apache.flink.core.fs.Path getPath() {
+            return new org.apache.flink.core.fs.Path(targetFilePath.toString());
+        }
+
+        public long getSize() {
+            return fileSize;
+        }
+    }
+
+    @VisibleForTesting
+    static class HadoopPathBasedPendingFileRecoverableSerializer
+            implements
+                SimpleVersionedSerializer<PendingFileRecoverable> {
+
+        static final HadoopPathBasedPendingFileRecoverableSerializer INSTANCE =
+                new HadoopPathBasedPendingFileRecoverableSerializer();
+
+        private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+        private static final int MAGIC_NUMBER = 0x2c853c90;
+
+        @Override
+        public int getVersion() {
+            return 2;
+        }
+
+        @Override
+        public byte[] serialize(PendingFileRecoverable pendingFileRecoverable) {
+            if (!(pendingFileRecoverable instanceof HadoopPathBasedPartFileWriter.HadoopPathBasedPendingFileRecoverable)) {
+                throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
+            }
+
+            HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
+                    (HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
+            Path path = hadoopRecoverable.getTargetFilePath();
+            Path inProgressPath = hadoopRecoverable.getTempFilePath();
+
+            byte[] pathBytes = path.toUri().toString().getBytes(CHARSET);
+            byte[] inProgressBytes = inProgressPath.toUri().toString().getBytes(CHARSET);
+
+            byte[] targetBytes = new byte[12 + pathBytes.length + inProgressBytes.length + Long.BYTES];
+            ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+            bb.putInt(MAGIC_NUMBER);
+            bb.putInt(pathBytes.length);
+            bb.put(pathBytes);
+            bb.putInt(inProgressBytes.length);
+            bb.put(inProgressBytes);
+            bb.putLong(hadoopRecoverable.getSize());
+
+            return targetBytes;
+        }
+
+        @Override
+        public HadoopPathBasedPendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
+            switch (version) {
+                case 1:
+                    return deserializeV1(serialized);
+                case 2:
+                    return deserializeV2(serialized);
+                default:
+                    throw new IOException("Unrecognized version or corrupt state: " + version);
+            }
+        }
+
+        private HadoopPathBasedPendingFileRecoverable deserializeV1(byte[] serialized) throws IOException {
+            final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+            if (bb.getInt() != MAGIC_NUMBER) {
+                throw new IOException("Corrupt data: Unexpected magic number.");
+            }
+
+            byte[] targetFilePathBytes = new byte[bb.getInt()];
+            bb.get(targetFilePathBytes);
+            String targetFilePath = new String(targetFilePathBytes, CHARSET);
+
+            byte[] tempFilePathBytes = new byte[bb.getInt()];
+            bb.get(tempFilePathBytes);
+            String tempFilePath = new String(tempFilePathBytes, CHARSET);
+
+            return new HadoopPathBasedPendingFileRecoverable(new Path(targetFilePath), new Path(tempFilePath));
+        }
+
+        private HadoopPathBasedPendingFileRecoverable deserializeV2(byte[] serialized) throws IOException {
+            final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+            if (bb.getInt() != MAGIC_NUMBER) {
+                throw new IOException("Corrupt data: Unexpected magic number.");
+            }
+
+            byte[] targetFilePathBytes = new byte[bb.getInt()];
+            bb.get(targetFilePathBytes);
+            String targetFilePath = new String(targetFilePathBytes, CHARSET);
+
+            byte[] tempFilePathBytes = new byte[bb.getInt()];
+            bb.get(tempFilePathBytes);
+            String tempFilePath = new String(tempFilePathBytes, CHARSET);
+
+            long fileSize = bb.getLong();
+
+            return new HadoopPathBasedPendingFileRecoverable(new Path(targetFilePath), new Path(tempFilePath),
+                    fileSize);
+        }
+    }
+
+    private static class UnsupportedInProgressFileRecoverableSerializable
+            implements
+                SimpleVersionedSerializer<InProgressFileRecoverable> {
+
+        static final UnsupportedInProgressFileRecoverableSerializable INSTANCE =
+                new UnsupportedInProgressFileRecoverableSerializable();
+
+        @Override
+        public int getVersion() {
+            throw new UnsupportedOperationException("Persists the path-based part file write is not supported");
+        }
+
+        @Override
+        public byte[] serialize(InProgressFileRecoverable obj) {
+            throw new UnsupportedOperationException("Persists the path-based part file write is not supported");
+        }
+
+        @Override
+        public InProgressFileRecoverable deserialize(int version, byte[] serialized) {
+            throw new UnsupportedOperationException("Persists the path-based part file write is not supported");
+        }
+    }
+
+    /**
+     * Factory to create {@link HadoopPathBasedPartFileWriter}. This writer does not support
+     * snapshotting the in-progress files. For pending files, it stores the target path and the
+     * staging file path into the state.
+     */
+    public static class HadoopPathBasedBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
+
+        private final Configuration configuration;
+
+        private final HadoopPathBasedBulkWriter.Factory<IN> bulkWriterFactory;
+
+        private final HadoopFileCommitterFactory fileCommitterFactory;
+
+        private final HiveWriterFactory hiveWriterFactory;
+
+        @Nullable
+        private final transient SinkTableMetricData metricData;
+
+        private final DirtyOptions dirtyOptions;
+
+        private final @Nullable DirtySink<Object> dirtySink;
+
+        private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+
+        private final PartitionPolicy partitionPolicy;
+
+        private final String inputFormat;
+
+        private final String outputFormat;
+
+        private final String serializationLib;
+
+        private final HiveShim hiveShim;
+        private final String hiveVersion;
+
+        public HadoopPathBasedBucketWriter(Configuration configuration,
+                HadoopPathBasedBulkWriter.Factory<IN> bulkWriterFactory,
+                HadoopFileCommitterFactory fileCommitterFactory, @Nullable SinkTableMetricData metricData,
+                DirtyOptions dirtyOptions, @Nullable DirtySink<Object> dirtySink,
+                SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+                PartitionPolicy partitionPolicy,
+                HiveShim hiveShim,
+                String hiveVersion,
+                String inputFormat,
+                String outputFormat,
+                String serializationLib) {
+            this.configuration = configuration;
+            this.bulkWriterFactory = bulkWriterFactory;
+            this.hiveWriterFactory = ((HiveBulkWriterFactory) this.bulkWriterFactory).getFactory();
+            this.fileCommitterFactory = fileCommitterFactory;
+            this.metricData = metricData;
+            this.dirtyOptions = dirtyOptions;
+            this.dirtySink = dirtySink;
+            this.schemaUpdatePolicy = schemaUpdatePolicy;
+            this.partitionPolicy = partitionPolicy;
+            this.hiveShim = hiveShim;
+            this.hiveVersion = hiveVersion;
+            this.inputFormat = inputFormat;
+            this.outputFormat = outputFormat;
+            this.serializationLib = serializationLib;
+        }
+
+        @Override
+        public HadoopPathBasedPartFileWriter<IN, BucketID> openNewInProgressFile(BucketID bucketID,
+                org.apache.flink.core.fs.Path flinkPath, long creationTime) throws IOException {
+
+            Path path = new Path(flinkPath.toUri());
+            HadoopFileCommitter fileCommitter = fileCommitterFactory.create(configuration, path);
+            Path inProgressFilePath = fileCommitter.getTempFilePath();
+
+            InLongHadoopPathBasedBulkWriter writer = (InLongHadoopPathBasedBulkWriter) bulkWriterFactory.create(path,
+                    inProgressFilePath);
+            JobConf jobConf = hiveWriterFactory.getJobConf();
+            boolean sinkMultipleEnable = Boolean.parseBoolean(jobConf.get(SINK_MULTIPLE_ENABLE.key(), "false"));
+            String sinkMultipleFormat = jobConf.get(SINK_MULTIPLE_FORMAT.key());
+            String databasePattern = jobConf.get(SINK_MULTIPLE_DATABASE_PATTERN.key());
+            String tablePattern = jobConf.get(SINK_MULTIPLE_TABLE_PATTERN.key());
+            return new HadoopPathBasedPartFileWriter<>(bucketID,
+                    path,
+                    inProgressFilePath,
+                    writer,
+                    fileCommitter,
+                    creationTime,
+                    hiveShim,
+                    hiveVersion,
+                    sinkMultipleEnable,
+                    sinkMultipleFormat,
+                    databasePattern,
+                    tablePattern,
+                    metricData,
+                    dirtyOptions,
+                    dirtySink,
+                    schemaUpdatePolicy,
+                    partitionPolicy,
+                    inputFormat,
+                    outputFormat,
+                    serializationLib);
+        }
+
+        @Override
+        public PendingFile recoverPendingFile(PendingFileRecoverable pendingFileRecoverable) throws IOException {
+            if (!(pendingFileRecoverable instanceof HadoopPathBasedPartFileWriter.HadoopPathBasedPendingFileRecoverable)) {
+                throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
+            }
+
+            HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
+                    (HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
+            return new HadoopPathBasedPendingFile(
+                    fileCommitterFactory.recoverForCommit(configuration, hadoopRecoverable.getTargetFilePath(),
+                            hadoopRecoverable.getTempFilePath()),
+                    hadoopRecoverable.getSize());
+        }
+
+        @Override
+        public WriterProperties getProperties() {
+            return new WriterProperties(UnsupportedInProgressFileRecoverableSerializable.INSTANCE,
+                    HadoopPathBasedPendingFileRecoverableSerializer.INSTANCE, false);
+        }
+
+        @Override
+        public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(BucketID bucketID,
+                InProgressFileRecoverable inProgressFileSnapshot, long creationTime) {
+
+            throw new UnsupportedOperationException("Resume is not supported");
+        }
+
+        @Override
+        public boolean cleanupInProgressFileRecoverable(InProgressFileRecoverable inProgressFileRecoverable) {
+            return false;
+        }
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java
new file mode 100644
index 0000000..6d4816e
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.inlong.sort.hive.util.CacheHolder;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The Hadoop file committer that directly rename the in-progress file to the target file. For
+ * FileSystem like S3, renaming may lead to additional copies.
+ */
+public class HadoopRenameFileCommitter implements HadoopFileCommitter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopRenameFileCommitter.class);
+
+    private final Configuration configuration;
+
+    private Path targetFilePath;
+
+    private Path tempFilePath;
+
+    private boolean sinkMultipleEnable;
+
+    public HadoopRenameFileCommitter(Configuration configuration,
+            Path targetFilePath,
+            boolean sinkMultipleEnable)
+            throws IOException {
+        this.configuration = configuration;
+        this.targetFilePath = targetFilePath;
+        this.tempFilePath = generateTempFilePath();
+        this.sinkMultipleEnable = sinkMultipleEnable;
+    }
+
+    public HadoopRenameFileCommitter(Configuration configuration,
+            Path targetFilePath,
+            Path inProgressPath,
+            boolean sinkMultipleEnable) {
+        this.configuration = configuration;
+        this.targetFilePath = targetFilePath;
+        this.tempFilePath = inProgressPath;
+        this.sinkMultipleEnable = sinkMultipleEnable;
+    }
+
+    @Override
+    public Path getTargetFilePath() {
+        return targetFilePath;
+    }
+
+    @Override
+    public Path getTempFilePath() {
+        return tempFilePath;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public void preCommit() {
+        // Do nothing.
+    }
+
+    @Override
+    public void commit() throws IOException {
+        if (sinkMultipleEnable) {
+            commitMultiple(true);
+        } else {
+            rename(true);
+        }
+    }
+
+    @Override
+    public void commitAfterRecovery() throws IOException {
+        if (sinkMultipleEnable) {
+            commitMultiple(false);
+        } else {
+            rename(false);
+        }
+    }
+
+    private void commitMultiple(boolean assertFileExists) throws IOException {
+        LOG.info("file committer cache {}", CacheHolder.getFileCommitterHashMap());
+        Iterator<Path> iterator = CacheHolder.getFileCommitterHashMap().keySet().iterator();
+        while (iterator.hasNext()) {
+            Path path = iterator.next();
+            if (path.getName().equals(tempFilePath.getName())) {
+                HadoopRenameFileCommitter committer = CacheHolder.getFileCommitterHashMap().get(path);
+                committer.rename(assertFileExists);
+                iterator.remove();
+            }
+        }
+    }
+
+    private void rename(boolean assertFileExists) throws IOException {
+        FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);
+
+        if (!fileSystem.exists(tempFilePath)) {
+            if (assertFileExists) {
+                throw new IOException(
+                        String.format("In progress file(%s) not exists.", tempFilePath));
+            } else {
+                // By pass the re-commit if source file not exists.
+                // TODO: in the future we may also need to check if the target file exists.
+                return;
+            }
+        }
+
+        try {
+            // If file exists, it will be overwritten.
+            fileSystem.rename(tempFilePath, targetFilePath);
+        } catch (IOException e) {
+            throw new IOException(
+                    String.format(
+                            "Could not commit file from %s to %s", tempFilePath, targetFilePath),
+                    e);
+        }
+    }
+
+    private Path generateTempFilePath() throws IOException {
+        checkArgument(targetFilePath.isAbsolute(), "Target file must be absolute");
+
+        FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);
+
+        Path parent = targetFilePath.getParent();
+        String name = targetFilePath.getName();
+
+        while (true) {
+            Path candidate =
+                    new Path(parent, "." + name + ".inprogress." + UUID.randomUUID());
+            if (!fileSystem.exists(candidate)) {
+                return candidate;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/InLongHadoopPathBasedBulkWriter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/InLongHadoopPathBasedBulkWriter.java
new file mode 100644
index 0000000..57dd7d5
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/InLongHadoopPathBasedBulkWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+public class InLongHadoopPathBasedBulkWriter implements HadoopPathBasedBulkWriter<RowData> {
+
+    private FileSinkOperator.RecordWriter recordWriter;
+    private Function<RowData, Writable> rowConverter;
+    private FileSystem fs;
+    private Path inProgressPath;
+
+    public InLongHadoopPathBasedBulkWriter(FileSinkOperator.RecordWriter recordWriter,
+            Function<RowData, Writable> rowConverter,
+            FileSystem fs,
+            Path inProgressPath) {
+        this.recordWriter = recordWriter;
+        this.rowConverter = rowConverter;
+        this.fs = fs;
+        this.inProgressPath = inProgressPath;
+    }
+
+    public void setRecordWriter(RecordWriter recordWriter) {
+        this.recordWriter = recordWriter;
+    }
+
+    public void setInProgressPath(Path inProgressPath) throws IOException {
+        this.inProgressPath = inProgressPath;
+        if (this.fs.getScheme().equals("file")) {
+            // update fs with hdfs scheme
+            this.fs = FileSystem.get(inProgressPath.toUri(), fs.getConf());
+        }
+    }
+
+    public void setRowConverter(Function<RowData, Writable> rowConverter) {
+        this.rowConverter = rowConverter;
+    }
+
+    @Override
+    public long getSize() throws IOException {
+        // it's possible the in-progress file hasn't yet been created, due to writer lazy
+        // init or data buffering
+        return fs.exists(inProgressPath) ? fs.getFileStatus(inProgressPath).getLen() : 0;
+    }
+
+    @Override
+    public void dispose() {
+        // close silently.
+        try {
+            recordWriter.close(true);
+        } catch (IOException ignored) {
+        }
+    }
+
+    @Override
+    public void addElement(RowData element) throws IOException {
+        recordWriter.write(rowConverter.apply(element));
+    }
+
+    @Override
+    public void flush() {
+    }
+
+    @Override
+    public void finish() throws IOException {
+        recordWriter.close(false);
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/PartitionCommitter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/PartitionCommitter.java
new file mode 100644
index 0000000..cbd897f
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/PartitionCommitter.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.inlong.sort.hive.HiveTableMetaStoreFactory;
+import org.apache.inlong.sort.hive.table.HiveTableInlongFactory;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
+import org.apache.flink.table.filesystem.EmptyMetaStoreFactory;
+import org.apache.flink.table.filesystem.FileSystemFactory;
+import org.apache.flink.table.filesystem.MetastoreCommitPolicy;
+import org.apache.flink.table.filesystem.PartitionCommitPolicy;
+import org.apache.flink.table.filesystem.SuccessFileCommitPolicy;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+import org.apache.flink.table.filesystem.stream.PartitionCommitTrigger;
+import org.apache.flink.table.filesystem.stream.TaskTracker;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
+import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+
+/**
+ * Committer operator for partitions. This is the single (non-parallel) task. It collects all the
+ * partition information sent from upstream, and triggers the partition submission decision when it
+ * judges to collect the partitions from all tasks of a checkpoint.
+ *
+ * <p>NOTE: It processes records after the checkpoint completes successfully. Receive records from
+ * upstream {@link CheckpointListener#notifyCheckpointComplete}.
+ *
+ * <p>Processing steps: 1.Partitions are sent from upstream. Add partition to trigger. 2.{@link
+ * TaskTracker} say it have already received partition data from all tasks in a checkpoint.
+ * 3.Extracting committable partitions from {@link PartitionCommitTrigger}. 4.Using {@link
+ * PartitionCommitPolicy} chain to commit partitions.
+ */
+public class PartitionCommitter extends AbstractStreamOperator<Void>
+        implements
+            OneInputStreamOperator<PartitionCommitInfo, Void> {
+
+    /**
+     * hdfs files which modified less than HDFS_FILE_MODIFIED_THRESHOLD will be committed partitions
+     */
+    private static final long HDFS_FILES_MODIFIED_THRESHOLD = 5 * 60 * 1000L;
+
+    private static final long serialVersionUID = 1L;
+
+    private final Configuration conf;
+
+    private Path locationPath;
+
+    private final ObjectIdentifier tableIdentifier;
+
+    private final List<String> partitionKeys;
+
+    private final TableMetaStoreFactory metaStoreFactory;
+
+    private final FileSystemFactory fsFactory;
+
+    private transient PartitionCommitTrigger trigger;
+
+    private transient TaskTracker taskTracker;
+
+    private transient long currentWatermark;
+
+    private transient List<PartitionCommitPolicy> policies;
+
+    private final String hiveVersion;
+
+    private final boolean sinkMultipleEnable;
+
+    public PartitionCommitter(
+            Path locationPath,
+            ObjectIdentifier tableIdentifier,
+            List<String> partitionKeys,
+            TableMetaStoreFactory metaStoreFactory,
+            FileSystemFactory fsFactory,
+            Configuration conf) {
+        this.locationPath = locationPath;
+        this.tableIdentifier = tableIdentifier;
+        this.partitionKeys = partitionKeys;
+        this.metaStoreFactory = metaStoreFactory;
+        this.fsFactory = fsFactory;
+        this.conf = conf;
+        PartitionCommitPolicy.validatePolicyChain(
+                metaStoreFactory instanceof EmptyMetaStoreFactory,
+                conf.get(SINK_PARTITION_COMMIT_POLICY_KIND));
+        this.hiveVersion = conf.get(HiveCatalogFactoryOptions.HIVE_VERSION);
+        this.sinkMultipleEnable = conf.get(SINK_MULTIPLE_ENABLE);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+        this.currentWatermark = Long.MIN_VALUE;
+        this.trigger =
+                PartitionCommitTrigger.create(
+                        context.isRestored(),
+                        context.getOperatorStateStore(),
+                        conf,
+                        getUserCodeClassloader(),
+                        partitionKeys,
+                        getProcessingTimeService());
+        this.policies =
+                PartitionCommitPolicy.createPolicyChain(
+                        getUserCodeClassloader(),
+                        conf.get(SINK_PARTITION_COMMIT_POLICY_KIND),
+                        conf.get(SINK_PARTITION_COMMIT_POLICY_CLASS),
+                        conf.get(SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
+                        () -> {
+                            try {
+                                return fsFactory.create(locationPath.toUri());
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+    }
+
+    @Override
+    public void processElement(StreamRecord<PartitionCommitInfo> element) throws Exception {
+        PartitionCommitInfo message = element.getValue();
+        for (String partition : message.getPartitions()) {
+            trigger.addPartition(partition);
+        }
+
+        if (taskTracker == null) {
+            taskTracker = new TaskTracker(message.getNumberOfTasks());
+        }
+        boolean needCommit = taskTracker.add(message.getCheckpointId(), message.getTaskId());
+        if (needCommit) {
+            commitPartitions(message.getCheckpointId());
+        }
+    }
+
+    private void commitPartitions(long checkpointId) throws Exception {
+        List<String> partitions =
+                checkpointId == Long.MAX_VALUE ? trigger.endInput() : trigger.committablePartitions(checkpointId);
+        if (partitions.isEmpty()) {
+            return;
+        }
+
+        if (sinkMultipleEnable) {
+            StopWatch stopWatch = new StopWatch();
+            stopWatch.start();
+            HiveConf hiveConf = HiveTableInlongFactory.getHiveConf();
+            try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion)) {
+                // we do not know which tables need to commit, so list all tables.
+                // if there are many dbs and tables, performance bottleneck may occur.
+                List<String> dbs = client.getAllDatabases().stream().filter(db -> !db.equalsIgnoreCase("default"))
+                        .collect(Collectors.toList());
+                // hdfs path directory where hive db locates
+                Path dbPath;
+                List<String> partitionColumns = new ArrayList<>();
+                // HashMap<String, Table> tableCache = new HashMap<>(16);
+                FileSystem fs;
+                for (String db : dbs) {
+                    Database database = client.getDatabase(db);
+                    dbPath = new Path(database.getLocationUri());
+                    try {
+                        List<String> tables = client.getAllTables(db);
+                        for (String tableName : tables) {
+                            if (partitionColumns.size() == 0) {
+                                Table table = client.getTable(db, tableName);
+                                // tableCache.put(db + "." + tableName, table);
+                                // all tables must have same name partition field
+                                partitionColumns = table.getPartitionKeys().stream().map(FieldSchema::getName)
+                                        .collect(Collectors.toList());
+                                if (partitionColumns.size() == 0) {
+                                    continue;
+                                }
+                            }
+
+                            // reset partitionKeys for MetastoreCommitPolicy
+                            partitionKeys.clear();
+                            partitionKeys.addAll(partitionColumns);
+
+                            for (String partition : partitions) {
+                                LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(
+                                        new Path(partition));
+                                // All tables' hdfs files must be at the same path like
+                                // hdfs://0.0.0.0:9020/user/hive/warehouse.
+                                // Or the table will not be committed partition
+                                Path tablePath = new Path(dbPath, tableName);
+                                Path partitionPath = new Path(tablePath, generatePartitionPath(partSpec));
+
+                                fs = fsFactory.create(partitionPath.toUri());
+                                if (!fs.exists(partitionPath)) {
+                                    // partition path not exist
+                                    continue;
+                                    // if (!fs.exists(tablePath)) {
+                                    // LOG.debug("Table path {} not exist, load table from metastore",
+                                    // partitionPath.getParent());
+                                    // // table location maybe at other storage
+                                    // Table table = tableCache.get(db + "." + tableName);
+                                    // if (table == null) {
+                                    // table = client.getTable(db, tableName);
+                                    // tableCache.put(db + "." + tableName, table);
+                                    // }
+                                    // partitionPath = new Path(table.getSd().getLocation(),
+                                    // generatePartitionPath(partSpec));
+                                    // fs = fsFactory.create(partitionPath.toUri());
+                                    // if (!fs.exists(partitionPath)) {
+                                    // // partition path not exist
+                                    // continue;
+                                    // }
+                                    // // reset partitionKeys for MetastoreCommitPolicy partitionKeys.clear();
+                                    // partitionKeys.addAll(table.getPartitionKeys().stream().map(FieldSchema::getName)
+                                    // .collect(Collectors.toList()));
+                                    // }
+                                }
+
+                                if (stopWatch.getStartTime()
+                                        - fs.getFileStatus(partitionPath)
+                                                .getModificationTime() > HDFS_FILES_MODIFIED_THRESHOLD) {
+                                    // last modified more than HDFS_FILE_MODIFIED_THRESHOLD millis, no need to commit
+                                    continue;
+                                }
+
+                                // reset locationPath for MetastoreCommitPolicy
+                                locationPath = partitionPath;
+
+                                LOG.info("Partition {} of table {}.{} is ready to be committed", partSpec, db,
+                                        tableName);
+
+                                HiveTableMetaStoreFactory factory = new HiveTableMetaStoreFactory(
+                                        new JobConf(HiveTableInlongFactory.getHiveConf()), hiveVersion, db, tableName);
+                                try (TableMetaStoreFactory.TableMetaStore metaStore = factory.createTableMetaStore()) {
+                                    PartitionCommitPolicy.Context context = new CommitPolicyContextImpl(
+                                            new ArrayList<>(partSpec.values()), partitionPath);
+                                    for (PartitionCommitPolicy policy : policies) {
+                                        if (policy instanceof MetastoreCommitPolicy) {
+                                            ((MetastoreCommitPolicy) policy).setMetastore(metaStore);
+                                        } else if (policy instanceof SuccessFileCommitPolicy) {
+                                            String successFileName = conf.get(SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME);
+                                            policy = new SuccessFileCommitPolicy(successFileName, fs);
+                                        }
+                                        policy.commit(context);
+                                    }
+                                }
+                            }
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Commit partition occurs error", e);
+                        break;
+                    }
+                }
+            }
+            stopWatch.stop();
+            LOG.info("Commit partitions spend {}ms", stopWatch.getTime());
+        } else {
+            try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {
+                for (String partition : partitions) {
+                    LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));
+                    LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);
+                    Path path = new Path(locationPath, generatePartitionPath(partSpec));
+                    PartitionCommitPolicy.Context context = new CommitPolicyContextImpl(
+                            new ArrayList<>(partSpec.values()), path);
+                    for (PartitionCommitPolicy policy : policies) {
+                        if (policy instanceof MetastoreCommitPolicy) {
+                            ((MetastoreCommitPolicy) policy).setMetastore(metaStore);
+                        }
+                        policy.commit(context);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        super.processWatermark(mark);
+        this.currentWatermark = mark.getTimestamp();
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        trigger.snapshotState(context.getCheckpointId(), currentWatermark);
+    }
+
+    private class CommitPolicyContextImpl implements PartitionCommitPolicy.Context {
+
+        private final List<String> partitionValues;
+        private final Path partitionPath;
+
+        private CommitPolicyContextImpl(List<String> partitionValues, Path partitionPath) {
+            this.partitionValues = partitionValues;
+            this.partitionPath = partitionPath;
+        }
+
+        @Override
+        public String catalogName() {
+            return tableIdentifier.getCatalogName();
+        }
+
+        @Override
+        public String databaseName() {
+            return tableIdentifier.getDatabaseName();
+        }
+
+        @Override
+        public String tableName() {
+            return tableIdentifier.getObjectName();
+        }
+
+        @Override
+        public List<String> partitionKeys() {
+            return partitionKeys;
+        }
+
+        @Override
+        public List<String> partitionValues() {
+            return partitionValues;
+        }
+
+        @Override
+        public Path partitionPath() {
+            return partitionPath;
+        }
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
index 447b4bb..d90d463 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
@@ -35,7 +35,6 @@
 import org.apache.flink.table.filesystem.FileSystemFactory;
 import org.apache.flink.table.filesystem.TableMetaStoreFactory;
 import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
-import org.apache.flink.table.filesystem.stream.PartitionCommitter;
 import org.apache.flink.table.filesystem.stream.compact.CompactBucketWriter;
 import org.apache.flink.table.filesystem.stream.compact.CompactCoordinator;
 import org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput;
@@ -152,10 +151,9 @@
             FileSystemFactory fsFactory,
             Configuration options) {
         DataStream<?> stream = writer;
-        if (partitionKeys.size() > 0 && options.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
-            PartitionCommitter committer =
-                    new PartitionCommitter(
-                            locationPath, identifier, partitionKeys, msFactory, fsFactory, options);
+        if (options.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
+            PartitionCommitter committer = new PartitionCommitter(locationPath, identifier, partitionKeys, msFactory,
+                    fsFactory, options);
             stream =
                     writer.transform(
                             PartitionCommitter.class.getSimpleName(), Types.VOID, committer)
@@ -165,4 +163,5 @@
 
         return stream.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
     }
+
 }
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
index 8a8bf15..d846334 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
@@ -20,6 +20,8 @@
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.hive.HiveTableSink;
 
 import com.google.common.base.Preconditions;
@@ -50,27 +52,30 @@
 import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_CONF_DIR;
 import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_VERSION;
 import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
 import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_ENABLE;
 import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
 import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_PARTITION_POLICY;
+import static org.apache.inlong.sort.base.Constants.SOURCE_PARTITION_FIELD_NAME;
 import static org.apache.inlong.sort.hive.HiveOptions.HIVE_DATABASE;
+import static org.apache.inlong.sort.hive.HiveOptions.HIVE_STORAGE_INPUT_FORMAT;
+import static org.apache.inlong.sort.hive.HiveOptions.HIVE_STORAGE_OUTPUT_FORMAT;
+import static org.apache.inlong.sort.hive.HiveOptions.HIVE_STORAGE_SERIALIZATION_LIB;
 
 /**
  * DynamicTableSourceFactory for hive table source
  */
 public class HiveTableInlongFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
 
-    private final HiveConf hiveConf;
-
-    public HiveTableInlongFactory() {
-        this.hiveConf = new HiveConf();
-    }
-
-    public HiveTableInlongFactory(HiveConf hiveConf) {
-        this.hiveConf = hiveConf;
-    }
+    private static final HiveConf hiveConf = new HiveConf();
 
     @Override
     public String factoryIdentifier() {
@@ -94,26 +99,39 @@
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
         options.add(AUDIT_KEYS);
+        options.add(SINK_MULTIPLE_ENABLE);
+        options.add(SINK_MULTIPLE_DATABASE_PATTERN);
+        options.add(SINK_MULTIPLE_TABLE_PATTERN);
+        options.add(SINK_MULTIPLE_FORMAT);
+        options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+        options.add(SOURCE_PARTITION_FIELD_NAME);
         return options;
     }
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
+        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
         final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());
         Map<String, String> options = context.getCatalogTable().getOptions();
         // temporary table doesn't have the IS_GENERIC flag but we still consider it generic
         if (isHiveTable) {
             updateHiveConf(options);
             // new HiveValidator().validate(properties);
-            Integer configuredParallelism =
-                    Configuration.fromMap(context.getCatalogTable().getOptions())
-                            .get(FileSystemOptions.SINK_PARALLELISM);
-            final String inlongMetric = context.getCatalogTable().getOptions()
-                    .getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue());
-            final String auditHostAndPorts = context.getCatalogTable().getOptions()
-                    .getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue());
+            Integer configuredParallelism = helper.getOptions().get(FileSystemOptions.SINK_PARALLELISM);
+            final String inlongMetric = helper.getOptions().get(INLONG_METRIC);
+            final String auditHostAndPorts = helper.getOptions().get(INLONG_AUDIT);
             final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(Configuration.fromMap(options));
             final DirtySink<Object> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy = helper.getOptions()
+                    .get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+            PartitionPolicy partitionPolicy = helper.getOptions().get(SINK_PARTITION_POLICY);
+            String partitionField = helper.getOptions().get(SOURCE_PARTITION_FIELD_NAME);
+            String timestampPattern = helper.getOptions().getOptional(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN)
+                    .orElse("yyyy-MM-dd");
+            boolean sinkMultipleEnable = helper.getOptions().get(SINK_MULTIPLE_ENABLE);
+            String inputFormat = helper.getOptions().get(HIVE_STORAGE_INPUT_FORMAT);
+            String outputFormat = helper.getOptions().get(HIVE_STORAGE_OUTPUT_FORMAT);
+            String serializationLib = helper.getOptions().get(HIVE_STORAGE_SERIALIZATION_LIB);
             return new HiveTableSink(
                     context.getConfiguration(),
                     new JobConf(hiveConf),
@@ -123,7 +141,15 @@
                     inlongMetric,
                     auditHostAndPorts,
                     dirtyOptions,
-                    dirtySink);
+                    dirtySink,
+                    schemaUpdatePolicy,
+                    partitionPolicy,
+                    partitionField,
+                    timestampPattern,
+                    sinkMultipleEnable,
+                    inputFormat,
+                    outputFormat,
+                    serializationLib);
         } else {
             return FactoryUtil.createTableSink(
                     null, // we already in the factory of catalog
@@ -194,4 +220,8 @@
             hiveConf.set(entry.getKey(), entry.getValue());
         }
     }
+
+    public static HiveConf getHiveConf() {
+        return hiveConf;
+    }
 }
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/CacheHolder.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/CacheHolder.java
new file mode 100644
index 0000000..ceb9c8b
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/CacheHolder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.util;
+
+import org.apache.inlong.sort.hive.HiveWriterFactory;
+import org.apache.inlong.sort.hive.filesystem.HadoopRenameFileCommitter;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+
+import java.util.HashMap;
+import java.util.function.Function;
+
+public class CacheHolder {
+
+    /**
+     * hive writer factory cache
+     */
+    private static final HashMap<ObjectIdentifier, HiveWriterFactory> factoryMap = new HashMap<>(16);
+    /**
+     * hive record writer cache
+     */
+    private static final HashMap<Path, FileSinkOperator.RecordWriter> recordWriterHashMap = new HashMap<>(16);
+    /**
+     * hive hdfs file committer cache
+     */
+    private static final HashMap<Path, HadoopRenameFileCommitter> fileCommitterHashMap = new HashMap<>(16);
+    /**
+     * hive row converter cache
+     */
+    private static final HashMap<Path, Function<RowData, Writable>> rowConverterHashMap = new HashMap<>(16);
+    /**
+     * hive schema check time cache
+     */
+    private static final HashMap<ObjectIdentifier, Long> schemaCheckTimeMap = new HashMap<>(16);
+    /**
+     * ignore writing hive table after exception as schema policy is STOP_PARTIAL
+     */
+    private static final HashMap<ObjectIdentifier, Long> ignoreWritingTableMap = new HashMap<>(16);
+
+    public static HashMap<Path, RecordWriter> getRecordWriterHashMap() {
+        return recordWriterHashMap;
+    }
+
+    public static HashMap<Path, Function<RowData, Writable>> getRowConverterHashMap() {
+        return rowConverterHashMap;
+    }
+
+    public static HashMap<ObjectIdentifier, HiveWriterFactory> getFactoryMap() {
+        return factoryMap;
+    }
+
+    public static HashMap<Path, HadoopRenameFileCommitter> getFileCommitterHashMap() {
+        return fileCommitterHashMap;
+    }
+
+    public static HashMap<ObjectIdentifier, Long> getSchemaCheckTimeMap() {
+        return schemaCheckTimeMap;
+    }
+
+    public static HashMap<ObjectIdentifier, Long> getIgnoreWritingTableMap() {
+        return ignoreWritingTableMap;
+    }
+
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/HiveTableUtil.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/HiveTableUtil.java
new file mode 100644
index 0000000..188a8dc
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/HiveTableUtil.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.util;
+
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.hive.HiveWriterFactory;
+import org.apache.inlong.sort.hive.table.HiveTableInlongFactory;
+
+import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.inlong.sort.hive.HiveOptions.SINK_PARTITION_NAME;
+
+/**
+ * Utility class for list or create hive table
+ */
+public class HiveTableUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveTableUtil.class);
+    private static final String DEFAULT_PARTITION_DATE_FIELD = "[(create)(update)].*[(date)(time)]";
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static boolean changeSchema(RowType schema,
+            String[] hiveColumns,
+            DataType[] hiveTypes,
+            String databaseName,
+            String tableName,
+            String hiveVersion) {
+        boolean changed = false;
+
+        Map<String, LogicalType> flinkTypeMap = new HashMap<>();
+        for (RowField field : schema.getFields()) {
+            // lowercase field name as Oracle field name is uppercase
+            flinkTypeMap.put(field.getName().toLowerCase(), field.getType());
+        }
+
+        List<String> columnsFromData = new ArrayList<>(flinkTypeMap.keySet());
+        List<String> columnsFromHive = Arrays.asList(hiveColumns);
+        columnsFromData.removeAll(columnsFromHive);
+        // add new field
+        for (String fieldName : columnsFromData) {
+            boolean result = alterTable(databaseName, tableName, fieldName, flinkTypeMap.get(fieldName), hiveVersion);
+            if (result) {
+                changed = true;
+            }
+        }
+        // modify field type
+        for (int i = 0; i < hiveColumns.length; i++) {
+            if (!flinkTypeMap.containsKey(hiveColumns[i])) {
+                // hive table has field which source table does not have, ignore it
+                // so not support drop field dynamic now
+                continue;
+            }
+            LogicalType flinkType = flinkTypeMap.get(hiveColumns[i]);
+            if (hiveTypes[i].getLogicalType().getTypeRoot() != flinkType.getTypeRoot()) {
+                // field type changed
+                boolean result = alterTable(databaseName, tableName, hiveColumns[i], flinkType, hiveVersion);
+                if (result) {
+                    changed = true;
+                }
+            }
+        }
+        return changed;
+    }
+
+    public static boolean alterTable(String databaseName, String tableName, String fieldName, LogicalType type,
+            String hiveVersion) {
+        HiveConf hiveConf = HiveTableInlongFactory.getHiveConf();
+        try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion)) {
+            FieldSchema schema = new FieldSchema(fieldName, flinkType2HiveType(type), "");
+
+            Table table = client.getTable(databaseName, tableName);
+            ObjectIdentifier identifier = createObjectIdentifier(databaseName, tableName);
+            List<FieldSchema> fieldSchemaList = getTableFields(client, identifier);
+            // remove partition keys
+            fieldSchemaList.removeAll(table.getPartitionKeys());
+            boolean alter = false;
+            boolean exist = false;
+            for (FieldSchema fieldSchema : fieldSchemaList) {
+                if (fieldSchema.getName().equals(fieldName)) {
+                    exist = true;
+                    if (!flinkType2HiveType(type).equalsIgnoreCase(fieldSchema.getType())) {
+                        LOG.info("table {}.{} field {} change type from {} to {}", databaseName, tableName, fieldName,
+                                fieldSchema.getType(), flinkType2HiveType(type));
+                        fieldSchema.setType(flinkType2HiveType(type));
+                        alter = true;
+                    }
+                    break;
+                }
+            }
+            if (!exist) {
+                LOG.info("table {}.{} add new field {} with type {}", databaseName, tableName, fieldName,
+                        flinkType2HiveType(type));
+                fieldSchemaList.add(schema);
+                alter = true;
+            }
+            if (alter) {
+                table.getSd().setCols(fieldSchemaList);
+                IMetaStoreClient metaStoreClient = getMetaStoreClient(client);
+                EnvironmentContext environmentContext = new EnvironmentContext();
+                environmentContext.putToProperties("CASCADE", "true");
+                metaStoreClient.alter_table_with_environmentContext(databaseName, tableName, table, environmentContext);
+                LOG.info("alter table {}.{} success", databaseName, tableName);
+                return true;
+            }
+        } catch (TException e) {
+            throw new CatalogException(String.format("Failed to alter hive table %s.%s", databaseName, tableName), e);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+        return false;
+    }
+
+    /**
+     * create hive table with default `pt` partition field
+     *
+     * @param databaseName database name
+     * @param tableName table name
+     * @param schema flink field type
+     * @param partitionPolicy policy of partitioning table
+     * @param hiveVersion hive version
+     * @param inputFormat the input format of storage descriptor
+     * @param outputFormat the output format of storage descriptor
+     * @param serializationLib the serialization library of storage descriptor
+     */
+    public static void createTable(String databaseName, String tableName, RowType schema,
+            PartitionPolicy partitionPolicy, String hiveVersion, String inputFormat, String outputFormat,
+            String serializationLib) {
+        HiveConf hiveConf = HiveTableInlongFactory.getHiveConf();
+        try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion)) {
+
+            List<String> dbs = client.getAllDatabases();
+            boolean dbExist = dbs.stream().anyMatch(databaseName::equalsIgnoreCase);
+            // create database if not exists
+            if (!dbExist) {
+                Database defaultDb = client.getDatabase("default");
+                Database targetDb = new Database();
+                targetDb.setName(databaseName);
+                targetDb.setLocationUri(defaultDb.getLocationUri() + "/" + databaseName + ".db");
+                client.createDatabase(targetDb);
+            }
+
+            String sinkPartitionName = hiveConf.get(SINK_PARTITION_NAME.key(), SINK_PARTITION_NAME.defaultValue());
+            FieldSchema defaultPartition = new FieldSchema(sinkPartitionName, "string", "");
+
+            List<FieldSchema> fieldSchemaList = new ArrayList<>();
+            for (RowField field : schema.getFields()) {
+                if (!field.getName().equals(sinkPartitionName)) {
+                    FieldSchema hiveFieldSchema = new FieldSchema(field.getName(), flinkType2HiveType(field.getType()),
+                            "");
+                    fieldSchemaList.add(hiveFieldSchema);
+                }
+            }
+
+            Table table = new Table();
+            table.setDbName(databaseName);
+            table.setTableName(tableName);
+
+            if (PartitionPolicy.NONE != partitionPolicy) {
+                table.setPartitionKeys(Collections.singletonList(defaultPartition));
+            }
+
+            StorageDescriptor sd = new StorageDescriptor();
+            table.setSd(sd);
+            sd.setCols(fieldSchemaList);
+            sd.setInputFormat(inputFormat);
+            sd.setOutputFormat(outputFormat);
+            sd.setSerdeInfo(new SerDeInfo());
+            sd.getSerdeInfo().setSerializationLib(serializationLib);
+            client.createTable(table);
+            LOG.info("create table {}.{}", databaseName, tableName);
+        } catch (TException e) {
+            throw new CatalogException("Failed to create database", e);
+        }
+    }
+
+    /**
+     * Cache hive wrtier factory
+     *
+     * @param hiveShim hiveShim object
+     * @param hiveVersion hive version
+     * @param identifier object identifier
+     * @return hive writer factory
+     */
+    public static HiveWriterFactory getWriterFactory(HiveShim hiveShim, String hiveVersion,
+            ObjectIdentifier identifier) {
+        if (!CacheHolder.getFactoryMap().containsKey(identifier)) {
+            HiveConf hiveConf = HiveTableInlongFactory.getHiveConf();
+            try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion)) {
+                List<String> tableNames = client.getAllTables(identifier.getDatabaseName());
+                LOG.info("table names: {}", Arrays.deepToString(tableNames.toArray()));
+                boolean tableExist = tableNames.stream().anyMatch(identifier.getObjectName()::equalsIgnoreCase);
+                if (!tableExist) {
+                    return null;
+                }
+
+                Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
+                StorageDescriptor sd = table.getSd();
+
+                List<FieldSchema> fieldSchemaList = getTableFields(client, identifier);
+
+                List<FieldSchema> partitionSchemas = table.getPartitionKeys();
+                String[] partitions = partitionSchemas.stream().map(FieldSchema::getName).toArray(String[]::new);
+
+                TableSchema.Builder builder = new TableSchema.Builder();
+                for (FieldSchema fieldSchema : fieldSchemaList) {
+                    LogicalType logicalType = LogicalTypeParser.parse(fieldSchema.getType());
+                    if (logicalType instanceof TimestampType) {
+                        TimestampType timestampType = (TimestampType) logicalType;
+                        logicalType = new TimestampType(timestampType.isNullable(), timestampType.getKind(), 9);
+                    } else if (logicalType instanceof ZonedTimestampType) {
+                        ZonedTimestampType timestampType = (ZonedTimestampType) logicalType;
+                        logicalType = new ZonedTimestampType(timestampType.isNullable(), timestampType.getKind(), 9);
+                    }
+                    builder.field(fieldSchema.getName(), new AtomicDataType(logicalType));
+                }
+                TableSchema schema = builder.build();
+
+                Class hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
+                boolean isCompressed = hiveConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
+                HiveWriterFactory writerFactory = new HiveWriterFactory(new JobConf(hiveConf), hiveOutputFormatClz, sd,
+                        schema, partitions, HiveReflectionUtils.getTableMetadata(hiveShim, table), hiveShim,
+                        isCompressed, true);
+                CacheHolder.getFactoryMap().put(identifier, writerFactory);
+            } catch (TException e) {
+                throw new CatalogException("Failed to query Hive metaStore", e);
+            } catch (ClassNotFoundException e) {
+                throw new FlinkHiveException("Failed to get output format class", e);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                throw new FlinkHiveException("Failed to get hive metastore client", e);
+            }
+        }
+        return CacheHolder.getFactoryMap().get(identifier);
+    }
+
+    /**
+     * get hive metastore client
+     *
+     * @param client hive metastore client
+     * @return hive metastore client
+     */
+    public static IMetaStoreClient getMetaStoreClient(HiveMetastoreClientWrapper client)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field clientField = HiveMetastoreClientWrapper.class.getDeclaredField("client");
+        clientField.setAccessible(true);
+        return (IMetaStoreClient) clientField.get(client);
+    }
+
+    /**
+     * get hive table fields schema
+     *
+     * @param client hive metastore client
+     * @param identifier hive database and table name
+     * @return hive field schema
+     */
+    public static List<FieldSchema> getTableFields(HiveMetastoreClientWrapper client, ObjectIdentifier identifier)
+            throws TException, NoSuchFieldException, IllegalAccessException {
+        IMetaStoreClient metaStoreClient = getMetaStoreClient(client);
+        return metaStoreClient.getSchema(identifier.getDatabaseName(), identifier.getObjectName());
+    }
+
+    /**
+     * if raw data has field like create_date or create_time, give it to default partition field.
+     * or give first timestamp or date type field value to default partition field
+     *
+     * @param rawData row data
+     * @param schema flink field types
+     * @param partitionPolicy partition policy
+     * @param partitionField partition field
+     * @param timestampPattern timestamp pattern
+     * @return default partition value
+     */
+    public static Object getDefaultPartitionValue(Map<String, Object> rawData, RowType schema,
+            PartitionPolicy partitionPolicy, String partitionField, String timestampPattern) {
+        if (PartitionPolicy.NONE == partitionPolicy) {
+            return null;
+        }
+
+        if (PartitionPolicy.PROC_TIME == partitionPolicy) {
+            return DateTimeFormatter.ofPattern(timestampPattern).format(LocalDateTime.now());
+        }
+
+        String defaultPartitionFieldName = DEFAULT_PARTITION_DATE_FIELD;
+        if (PartitionPolicy.ASSIGN_FIELD == partitionPolicy) {
+            defaultPartitionFieldName = partitionField;
+        }
+        Pattern pattern = Pattern.compile(Pattern.quote(defaultPartitionFieldName), Pattern.CASE_INSENSITIVE);
+        for (RowField field : schema.getFields()) {
+            LogicalTypeRoot type = field.getType().getTypeRoot();
+            if (type == TIMESTAMP_WITH_LOCAL_TIME_ZONE || type == TIMESTAMP_WITH_TIME_ZONE
+                    || type == TIMESTAMP_WITHOUT_TIME_ZONE || type == DATE) {
+                if (pattern.matcher(field.getName()).matches()) {
+                    String value = (String) rawData.get(field.getName());
+                    return formatDate(value, timestampPattern);
+                }
+            }
+        }
+        if (PartitionPolicy.SOURCE_DATE_FIELD == partitionPolicy) {
+            // no create or update time field, return proc time
+            return DateTimeFormatter.ofPattern(timestampPattern).format(LocalDateTime.now());
+        }
+        return null;
+    }
+
+    public static String formatDate(String dateStr, String toPattern) {
+        if (StringUtils.isBlank(dateStr)) {
+            return null;
+        }
+        LocalDateTime localDateTime = parseDate(dateStr);
+        if (localDateTime == null) {
+            return null;
+        }
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(toPattern);
+        return formatter.format(localDateTime);
+    }
+
+    public static LocalDateTime parseDate(String dateStr) {
+        if (StringUtils.isBlank(dateStr)) {
+            return null;
+        }
+        ZonedDateTime zonedDateTime = null;
+        try {
+            zonedDateTime = ZonedDateTime.parse(dateStr,
+                    DateTimeFormatter.ofPattern("yyyy-MM-dd['T'][' ']HH:mm:ss[.SSS][XXX][Z]"));
+            return LocalDateTime.ofInstant(zonedDateTime.toInstant(), ZoneId.systemDefault());
+        } catch (DateTimeParseException ignored) {
+            String[] patterns = new String[]{"yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd", "yyyyMMdd"};
+            for (String pattern : patterns) {
+                try {
+                    return LocalDateTime.parse(dateStr, DateTimeFormatter.ofPattern(pattern));
+                } catch (DateTimeParseException ignored2) {
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * flink type to hive type
+     *
+     * @param type flink field type
+     * @return hive type
+     */
+    public static String flinkType2HiveType(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case INTEGER:
+                return "int";
+            case BIGINT:
+                return "bigint";
+            case TINYINT:
+                return "tinyint";
+            case SMALLINT:
+                return "smallint";
+            case FLOAT:
+                return "float";
+            case DOUBLE:
+                return "double";
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) type;
+                return String.format("decimal(%s,%s)", decimalType.getPrecision(), decimalType.getScale());
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIME_WITHOUT_TIME_ZONE:
+                return "timestamp";
+            case DATE:
+                return "date";
+            case BOOLEAN:
+                return "boolean";
+            case BINARY:
+            case VARBINARY:
+                return "binary";
+            case ARRAY:
+                return "array";
+            case CHAR:
+            case VARCHAR:
+            default:
+                return "string";
+        }
+    }
+
+    /**
+     * convert json data to flink generic row data
+     *
+     * @param record json data
+     * @param allColumns hive column names
+     * @param allTypes hive column types
+     * @param replaceLineBreak if replace line break to blank to avoid data corrupt when hive text table
+     * @return generic row data and byte size of the data
+     */
+    public static Pair<GenericRowData, Integer> getRowData(Map<String, Object> record, String[] allColumns,
+            DataType[] allTypes, boolean replaceLineBreak) {
+        GenericRowData genericRowData = new GenericRowData(RowKind.INSERT, allColumns.length);
+        int byteSize = 0;
+        for (int index = 0; index < allColumns.length; index++) {
+            String columnName = allColumns[index];
+            LogicalType logicalType = allTypes[index].getLogicalType();
+            LogicalTypeRoot typeRoot = logicalType.getTypeRoot();
+            Object raw = record.get(columnName);
+            byteSize += raw == null ? 0 : String.valueOf(raw).getBytes(StandardCharsets.UTF_8).length;
+            switch (typeRoot) {
+                case BOOLEAN:
+                    genericRowData.setField(index, raw != null ? Boolean.parseBoolean(String.valueOf(raw)) : null);
+                    break;
+                case VARBINARY:
+                case BINARY:
+                    byte[] bytes = null;
+                    if (raw instanceof byte[]) {
+                        bytes = (byte[]) raw;
+                    } else if (raw instanceof String) {
+                        bytes = ((String) raw).getBytes(StandardCharsets.UTF_8);
+                    }
+                    genericRowData.setField(index, bytes);
+                    break;
+                case DECIMAL:
+                    genericRowData.setField(index, raw != null ? new BigDecimal(String.valueOf(raw)) : null);
+                    break;
+                case DOUBLE:
+                    genericRowData.setField(index, raw != null ? Double.valueOf(String.valueOf(raw)) : null);
+                    break;
+                case FLOAT:
+                    genericRowData.setField(index, raw != null ? Float.valueOf(String.valueOf(raw)) : null);
+                    break;
+                case INTEGER:
+                    genericRowData.setField(index, raw != null ? Integer.valueOf(String.valueOf(raw)) : null);
+                    break;
+                case BIGINT:
+                    genericRowData.setField(index, raw != null ? Long.valueOf(String.valueOf(raw)) : null);
+                    break;
+                case TINYINT:
+                    genericRowData.setField(index, raw != null ? Short.valueOf(String.valueOf(raw)) : null);
+                    break;
+                case CHAR:
+                case VARCHAR:
+                    String value = null;
+                    if (raw != null) {
+                        value = String.valueOf(record.get(columnName));
+                        if (replaceLineBreak) {
+                            value = value.replaceAll("[\r\n]", " ");
+                        }
+                    }
+                    genericRowData.setField(index, value);
+                    break;
+                case DATE:
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_TIME_ZONE:
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                    if (raw instanceof String) {
+                        genericRowData.setField(index, parseDate((String) raw));
+                    } else if (raw instanceof Long) {
+                        genericRowData.setField(index, new Date((Long) raw));
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+        return new ImmutablePair<>(genericRowData, byteSize);
+    }
+
+    /**
+     * Convert json node to hash map.
+     * Lowercase the keys of map, because Oracle cdc sends record with uppercase field name, but hive table only
+     * supports lowercase field name
+     *
+     * @param data json node
+     * @return list of map data
+     */
+    public static List<Map<String, Object>> jsonNode2Map(JsonNode data) {
+        if (data == null) {
+            return new ArrayList<>();
+        }
+        List<Map<String, Object>> values = new ArrayList<>();
+        if (data.isArray()) {
+            for (int i = 0; i < data.size(); i++) {
+                values.add(jsonObject2Map(data.get(0)));
+            }
+        } else {
+            values.add(jsonObject2Map(data));
+        }
+        return values;
+    }
+
+    /**
+     * convert json object, such as JsonNode, JsonObject, to map
+     * @param data json object, such as JsonNode, JsonObject
+     * @return Map data
+     */
+    public static Map<String, Object> jsonObject2Map(Object data) {
+        CaseInsensitiveMap map = new CaseInsensitiveMap();
+        map.putAll(objectMapper.convertValue(data, new TypeReference<Map<String, Object>>() {
+        }));
+        return map;
+    }
+
+    /**
+     * convert map to JsonNode
+     *
+     * @param data map data
+     * @return json node
+     */
+    public static JsonNode object2JsonNode(Map<String, Object> data) {
+        try {
+            return objectMapper.readTree(objectMapper.writeValueAsString(data));
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * create object identifier
+     *
+     * @param databaseName database name
+     * @param tableName table name
+     * @return
+     */
+    public static ObjectIdentifier createObjectIdentifier(String databaseName, String tableName) {
+        return ObjectIdentifier.of("default_catalog", databaseName, tableName);
+    }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 44bb810..1543e8d 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -469,6 +469,14 @@
       inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java
       inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HadoopFileSystemFactory.java
       inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
+      inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/DefaultHadoopFileCommitterFactory.java
+      inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedBulkFormatBuilder.java
+      inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedPartFileWriter.java
+      inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java
+      inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/PartitionCommitter.java
+      inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveBulkWriterFactory.java
+      inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java
+      inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveWriterFactory.java
  Source  : flink-connector-hive 1.13.5 (Please note that the software have been modified.)
  License : https://github.com/apache/flink/blob/master/LICENSE