[INLONG-8092][Sort] Support all database and multiple tables transmission for Hive (#8096)
diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index e04c8b1..84cf2c5 100644
--- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.base;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.flink.configuration.ConfigOption;
@@ -261,6 +262,20 @@
.withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and"
+ "`time`, so type conversions must be mapped to types supported by spark.");
+ public static final ConfigOption<PartitionPolicy> SINK_PARTITION_POLICY =
+ ConfigOptions.key("sink.partition.policy")
+ .enumType(PartitionPolicy.class)
+ .defaultValue(PartitionPolicy.PROC_TIME)
+ .withDescription("The policy of partitioning table.");
+
+ public static final ConfigOption<String> SOURCE_PARTITION_FIELD_NAME =
+ ConfigOptions.key("source.partition.field.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The field name in source generic raw data, partition table by the field value dynamically."
+ + "Support regex expressions to match many fields in source generic raw data.");
+
// ========================================= dirty configuration =========================================
public static final String DIRTY_PREFIX = "dirty.";
diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/PartitionPolicy.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/PartitionPolicy.java
new file mode 100644
index 0000000..eb9eaf4
--- /dev/null
+++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/PartitionPolicy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.sink;
+
+public enum PartitionPolicy {
+
+ PROC_TIME("partition table by flink processing time"),
+ ASSIGN_FIELD("partition table by assigning fields"),
+ SOURCE_DATE_FIELD("partition table by date or timestamp field from source generic data"),
+ NONE("do not partition table");
+
+ PartitionPolicy(String description) {
+ this.description = description;
+ }
+
+ private String description;
+
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/pom.xml
index aff44ad..ff588fb 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/pom.xml
@@ -31,8 +31,135 @@
<properties>
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ <hive3x.version>3.1.3</hive3x.version>
+ <hive2x.version>2.2.0</hive2x.version>
+ <orc.core.version>1.4.3</orc.core.version>
+ <aircompressor.version>0.8</aircompressor.version>
</properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-vector-code-gen</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-shims</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>ST4</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.ant</groupId>
+ <artifactId>ant</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.ivy</groupId>
+ <artifactId>ivy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>apache-curator</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-all</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-druid</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-avatica</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>stax</groupId>
+ <artifactId>stax-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>apache-log4j-extras</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
@@ -50,6 +177,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -59,121 +190,6 @@
</dependency>
<dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-vector-code-gen</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-shims</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-httpclient</groupId>
- <artifactId>commons-httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-1.2-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.antlr</groupId>
- <artifactId>antlr-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.antlr</groupId>
- <artifactId>ST4</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.ant</groupId>
- <artifactId>ant</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.ivy</groupId>
- <artifactId>ivy</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>apache-curator</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.groovy</groupId>
- <artifactId>groovy-all</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-druid</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.calcite.avatica</groupId>
- <artifactId>avatica</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-avatica</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </exclusion>
- <exclusion>
- <groupId>stax</groupId>
- <artifactId>stax-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>apache-log4j-extras</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<exclusions>
@@ -209,9 +225,12 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
</exclusions>
</dependency>
-
</dependencies>
<build>
@@ -235,6 +254,14 @@
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
</includes>
</filter>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
</filters>
</configuration>
</execution>
@@ -242,4 +269,48 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>hive3</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive3x.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hive2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive2x.version}</version>
+ </dependency>
+ <!-- https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/hive/ -->
+ <!-- Orc dependencies ,required by the ORC vectorized optimizations -->
+ <dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ <version>${orc.core.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>aircompressor</artifactId>
+ <version>${aircompressor.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveBulkWriterFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveBulkWriterFactory.java
new file mode 100644
index 0000000..89aa2d6
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveBulkWriterFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive;
+
+import org.apache.inlong.sort.hive.filesystem.InLongHadoopPathBasedBulkWriter;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Hive bulk writer factory for path-based bulk file writer that writes to the specific hadoop path.
+ */
+public class HiveBulkWriterFactory implements HadoopPathBasedBulkWriter.Factory<RowData> {
+
+ private static final long serialVersionUID = 1L;
+ private final HiveWriterFactory factory;
+
+ public HiveBulkWriterFactory(HiveWriterFactory factory) {
+ this.factory = factory;
+ }
+
+ public HiveWriterFactory getFactory() {
+ return factory;
+ }
+
+ @Override
+ public HadoopPathBasedBulkWriter<RowData> create(Path targetPath, Path inProgressPath) throws IOException {
+ FileSinkOperator.RecordWriter recordWriter = factory.createRecordWriter(inProgressPath);
+ Function<RowData, Writable> rowConverter = factory.createRowDataConverter();
+ FileSystem fs = FileSystem.get(inProgressPath.toUri(), factory.getJobConf());
+ return new InLongHadoopPathBasedBulkWriter(recordWriter, rowConverter, fs, inProgressPath);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java
index 15d80d9..492c453 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java
@@ -66,4 +66,35 @@
.booleanType()
.defaultValue(false)
.withDescription("Regard upsert delete as insert kind.");
+
+ public static final ConfigOption<String> SINK_PARTITION_NAME =
+ ConfigOptions.key("sink.partition.name")
+ .stringType()
+ .defaultValue("pt")
+ .withDescription("The default partition name for creating new hive table.");
+
+ public static final ConfigOption<Integer> HIVE_SCHEMA_SCAN_INTERVAL =
+ ConfigOptions.key("sink.schema.scan.interval")
+ .intType()
+ .defaultValue(10)
+ .withDescription("The interval milliseconds to scan if source table schema changed.");
+
+ public static final ConfigOption<String> HIVE_STORAGE_INPUT_FORMAT =
+ ConfigOptions.key("hive.storage.input.format")
+ .stringType()
+ .defaultValue("org.apache.hadoop.mapred.TextInputFormat")
+ .withDescription("The input format of storage descriptor");
+
+ public static final ConfigOption<String> HIVE_STORAGE_OUTPUT_FORMAT =
+ ConfigOptions.key("hive.storage.output.format")
+ .stringType()
+ .defaultValue("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
+ .withDescription("The output format of storage descriptor");
+
+ public static final ConfigOption<String> HIVE_STORAGE_SERIALIZATION_LIB =
+ ConfigOptions.key("hive.storage.serialization.lib")
+ .stringType()
+ .defaultValue("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
+ .withDescription("The serialization library of storage descriptor");
+
}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java
new file mode 100644
index 0000000..f37740f
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.table.filesystem.OutputFormatFactory;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/** Hive {@link OutputFormatFactory}, use {@link RecordWriter} to write record. */
+public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
+
+ private static final long serialVersionUID = 2L;
+
+ private final HiveWriterFactory factory;
+
+ public HiveOutputFormatFactory(HiveWriterFactory factory) {
+ this.factory = factory;
+ }
+
+ @Override
+ public HiveOutputFormat createOutputFormat(Path path) {
+ return new HiveOutputFormat(
+ factory.createRecordWriter(HadoopFileSystem.toHadoopPath(path)),
+ factory.createRowConverter());
+ }
+
+ private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> {
+
+ private final RecordWriter recordWriter;
+ private final Function<Row, Writable> rowConverter;
+
+ private HiveOutputFormat(RecordWriter recordWriter, Function<Row, Writable> rowConverter) {
+ this.recordWriter = recordWriter;
+ this.rowConverter = rowConverter;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) {
+ }
+
+ @Override
+ public void writeRecord(Row record) throws IOException {
+ recordWriter.write(rowConverter.apply(record));
+ }
+
+ @Override
+ public void close() throws IOException {
+ recordWriter.close(false);
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveRowDataPartitionComputer.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveRowDataPartitionComputer.java
index f38a415..15de4c6 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveRowDataPartitionComputer.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveRowDataPartitionComputer.java
@@ -17,19 +17,41 @@
package org.apache.inlong.sort.hive;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.hive.util.CacheHolder;
+import org.apache.inlong.sort.hive.util.HiveTableUtil;
+
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.filesystem.RowDataPartitionComputer;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
/**
* A {@link RowDataPartitionComputer} that converts Flink objects to Hive objects before computing
@@ -37,16 +59,46 @@
*/
public class HiveRowDataPartitionComputer extends RowDataPartitionComputer {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveRowDataPartitionComputer.class);
+
private final DataFormatConverters.DataFormatConverter[] partitionConverters;
private final HiveObjectConversion[] hiveObjectConversions;
+ private transient JsonDynamicSchemaFormat jsonFormat;
+ private final boolean sinkMultipleEnable;
+ private final String sinkMultipleFormat;
+ private final String databasePattern;
+ private final String tablePattern;
+ private final HiveShim hiveShim;
+
+ private final String hiveVersion;
+ private final String inputFormat;
+ private final String outputFormat;
+ private final String serializationLib;
+
+ private final PartitionPolicy partitionPolicy;
+
+ private final String partitionField;
+
+ private final String timePattern;
+
public HiveRowDataPartitionComputer(
+ JobConf jobConf,
HiveShim hiveShim,
+ String hiveVersion,
String defaultPartValue,
String[] columnNames,
DataType[] columnTypes,
- String[] partitionColumns) {
+ String[] partitionColumns,
+ PartitionPolicy partitionPolicy,
+ String partitionField,
+ String timePattern,
+ String inputFormat,
+ String outputFormat,
+ String serializationLib) {
super(defaultPartValue, columnNames, columnTypes, partitionColumns);
+ this.hiveShim = hiveShim;
+ this.hiveVersion = hiveVersion;
this.partitionConverters =
Arrays.stream(partitionTypes)
.map(TypeConversions::fromLogicalToDataType)
@@ -60,20 +112,104 @@
HiveInspectors.getConversion(
objectInspector, partColType.getLogicalType(), hiveShim);
}
+ this.sinkMultipleEnable = Boolean.parseBoolean(jobConf.get(SINK_MULTIPLE_ENABLE.key(), "false"));
+ this.sinkMultipleFormat = jobConf.get(SINK_MULTIPLE_FORMAT.key());
+ this.databasePattern = jobConf.get(SINK_MULTIPLE_DATABASE_PATTERN.key());
+ this.tablePattern = jobConf.get(SINK_MULTIPLE_TABLE_PATTERN.key());
+ this.partitionPolicy = partitionPolicy;
+ this.partitionField = partitionField;
+ this.timePattern = timePattern;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.serializationLib = serializationLib;
}
@Override
public LinkedHashMap<String, String> generatePartValues(RowData in) {
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
- for (int i = 0; i < partitionIndexes.length; i++) {
- Object field = partitionConverters[i].toExternal(in, partitionIndexes[i]);
- String partitionValue =
- field != null ? hiveObjectConversions[i].toHiveObject(field).toString() : null;
- if (StringUtils.isEmpty(partitionValue)) {
- partitionValue = defaultPartValue;
+ if (sinkMultipleEnable) {
+ GenericRowData rowData = (GenericRowData) in;
+ JsonNode rootNode;
+ try {
+ if (jsonFormat == null) {
+ jsonFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ }
+ rootNode = jsonFormat.deserialize((byte[]) rowData.getField(0));
+
+ String databaseName = jsonFormat.parse(rootNode, databasePattern);
+ String tableName = jsonFormat.parse(rootNode, tablePattern);
+ List<Map<String, Object>> physicalDataList = HiveTableUtil.jsonNode2Map(
+ jsonFormat.getPhysicalData(rootNode));
+
+ List<String> pkListStr = jsonFormat.extractPrimaryKeyNames(rootNode);
+ RowType schema = jsonFormat.extractSchema(rootNode, pkListStr);
+
+ Map<String, Object> rawData = physicalDataList.get(0);
+ ObjectIdentifier identifier = HiveTableUtil.createObjectIdentifier(databaseName, tableName);
+
+ HashMap<ObjectIdentifier, Long> ignoreWritingTableMap = CacheHolder.getIgnoreWritingTableMap();
+ // ignore writing data into this table
+ if (ignoreWritingTableMap.containsKey(identifier)) {
+ return partSpec;
+ }
+
+ HiveWriterFactory hiveWriterFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+ if (hiveWriterFactory == null) {
+ HiveTableUtil.createTable(databaseName, tableName, schema, partitionPolicy, hiveVersion,
+ inputFormat, outputFormat, serializationLib);
+ hiveWriterFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+ }
+
+ String[] partitionColumns = hiveWriterFactory.getPartitionColumns();
+ if (partitionColumns.length == 0) {
+ // non partition table
+ return partSpec;
+ }
+
+ String[] columnNames = hiveWriterFactory.getAllColumns();
+ DataType[] allTypes = hiveWriterFactory.getAllTypes();
+ List<String> columnList = Arrays.asList(columnNames);
+ int[] partitionIndexes = Arrays.stream(partitionColumns).mapToInt(columnList::indexOf).toArray();
+
+ boolean replaceLineBreak = hiveWriterFactory.getStorageDescriptor().getInputFormat()
+ .contains("TextInputFormat");
+ Pair<GenericRowData, Integer> pair = HiveTableUtil.getRowData(rawData, columnNames, allTypes,
+ replaceLineBreak);
+ GenericRowData genericRowData = pair.getLeft();
+
+ Object field;
+ for (int i = 0; i < partitionIndexes.length; i++) {
+ int fieldIndex = partitionIndexes[i];
+ field = genericRowData.getField(fieldIndex);
+ DataType type = allTypes[fieldIndex];
+ ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
+ HiveObjectConversion hiveConversion = HiveInspectors.getConversion(objectInspector,
+ type.getLogicalType(), hiveShim);
+ String partitionValue = field != null ? String.valueOf(hiveConversion.toHiveObject(field)) : null;
+ if (partitionValue == null) {
+ field = HiveTableUtil.getDefaultPartitionValue(rawData, schema, partitionPolicy, partitionField,
+ timePattern);
+ partitionValue = field != null ? String.valueOf(hiveConversion.toHiveObject(field)) : null;
+ }
+ if (StringUtils.isEmpty(partitionValue)) {
+ partitionValue = defaultPartValue;
+ }
+ partSpec.put(partitionColumns[i], partitionValue);
+ }
+ } catch (Exception e) {
+ // do not throw exception, just log it. so HadoopPathBasedPartFileWriter will archive dirty data or not
+ LOG.error("Generate partition values error", e);
}
- partSpec.put(partitionColumns[i], partitionValue);
+ } else {
+ for (int i = 0; i < partitionIndexes.length; i++) {
+ Object field = partitionConverters[i].toExternal(in, partitionIndexes[i]);
+ String partitionValue = field != null ? hiveObjectConversions[i].toHiveObject(field).toString() : null;
+ if (StringUtils.isEmpty(partitionValue)) {
+ partitionValue = defaultPartValue;
+ }
+ partSpec.put(partitionColumns[i], partitionValue);
+ }
}
return partSpec;
}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java
index 9e433c6..3baef7f 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java
@@ -48,7 +48,7 @@
private final String database;
private final String tableName;
- HiveTableMetaStoreFactory(JobConf conf, String hiveVersion, String database, String tableName) {
+ public HiveTableMetaStoreFactory(JobConf conf, String hiveVersion, String database, String tableName) {
this.conf = new JobConfWrapper(conf);
this.hiveVersion = hiveVersion;
this.database = database;
@@ -60,7 +60,15 @@
return new HiveTableMetaStore();
}
- private class HiveTableMetaStore implements TableMetaStore {
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public class HiveTableMetaStore implements TableMetaStore {
private HiveMetastoreClientWrapper client;
private StorageDescriptor sd;
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
index f3c235d..b9792d4 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
@@ -19,6 +19,9 @@
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.hive.filesystem.HadoopPathBasedBulkFormatBuilder;
import org.apache.inlong.sort.hive.filesystem.StreamingSink;
import org.apache.flink.api.common.functions.MapFunction;
@@ -28,15 +31,11 @@
import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.connectors.hive.util.JobConfUtils;
-import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
-import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
-import org.apache.flink.connectors.hive.write.HiveWriterFactory;
import org.apache.flink.hive.shaded.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
@@ -75,9 +74,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.orc.TypeDescription;
@@ -127,6 +128,15 @@
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+ private final PartitionPolicy partitionPolicy;
+ private final String partitionField;
+ private final String timePattern;
+ private final boolean sinkMultipleEnable;
+ private final String inputFormat;
+ private final String outputFormat;
+ private final String serializationLib;
+
public HiveTableSink(
ReadableConfig flinkConf,
JobConf jobConf,
@@ -136,7 +146,15 @@
final String inlongMetric,
final String auditHostAndPorts,
DirtyOptions dirtyOptions,
- @Nullable DirtySink<Object> dirtySink) {
+ @Nullable DirtySink<Object> dirtySink,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+ PartitionPolicy partitionPolicy,
+ String partitionField,
+ String timePattern,
+ boolean sinkMultipleEnable,
+ String inputFormat,
+ String outputFormat,
+ String serializationLib) {
this.flinkConf = flinkConf;
this.jobConf = jobConf;
this.identifier = identifier;
@@ -152,6 +170,14 @@
this.auditHostAndPorts = auditHostAndPorts;
this.dirtyOptions = dirtyOptions;
this.dirtySink = dirtySink;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
+ this.partitionPolicy = partitionPolicy;
+ this.partitionField = partitionField;
+ this.timePattern = timePattern;
+ this.sinkMultipleEnable = sinkMultipleEnable;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.serializationLib = serializationLib;
}
@Override
@@ -172,23 +198,36 @@
try (HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
- Table table = client.getTable(dbName, identifier.getObjectName());
- StorageDescriptor sd = table.getSd();
-
- Class hiveOutputFormatClz =
- hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
+ StorageDescriptor sd;
+ Properties tableProps = new Properties();
+ Class hiveOutputFormatClz;
boolean isCompressed =
jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
- HiveWriterFactory writerFactory =
- new HiveWriterFactory(
- jobConf,
- hiveOutputFormatClz,
- sd.getSerdeInfo(),
- tableSchema,
- getPartitionKeyArray(),
- HiveReflectionUtils.getTableMetadata(hiveShim, table),
- hiveShim,
- isCompressed);
+ if (sinkMultipleEnable) {
+ sd = new StorageDescriptor();
+ SerDeInfo serDeInfo = new SerDeInfo();
+ serDeInfo.setSerializationLib(this.serializationLib);
+ sd.setSerdeInfo(serDeInfo);
+ String defaultFs = jobConf.get("fs.defaultFS", "");
+ sd.setLocation(defaultFs + "/tmp");
+ hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(HiveIgnoreKeyTextOutputFormat.class);
+ } else {
+ Table table = client.getTable(dbName, identifier.getObjectName());
+ sd = table.getSd();
+ tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
+ hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
+ }
+ HiveWriterFactory writerFactory = new HiveWriterFactory(
+ jobConf,
+ hiveOutputFormatClz,
+ sd,
+ tableSchema,
+ getPartitionKeyArray(),
+ tableProps,
+ hiveShim,
+ isCompressed,
+ sinkMultipleEnable);
+
String extension =
Utilities.getFileExtension(
jobConf,
@@ -204,14 +243,12 @@
Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
if (isBounded) {
OutputFileConfig fileNaming = fileNamingBuilder.build();
- return createBatchSink(
- dataStream, converter, sd, writerFactory, fileNaming, parallelism);
+ return createBatchSink(dataStream, converter, sd, writerFactory, fileNaming, parallelism);
} else {
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
- Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
return createStreamSink(
dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
}
@@ -278,13 +315,22 @@
identifier, FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
}
- HiveRowDataPartitionComputer partComputer =
- new HiveRowDataPartitionComputer(
- hiveShim,
- JobConfUtils.getDefaultPartitionName(jobConf),
- tableSchema.getFieldNames(),
- tableSchema.getFieldDataTypes(),
- getPartitionKeyArray());
+ HiveRowDataPartitionComputer partComputer;
+ partComputer = new HiveRowDataPartitionComputer(
+ jobConf,
+ hiveShim,
+ hiveVersion,
+ JobConfUtils.getDefaultPartitionName(jobConf),
+ tableSchema.getFieldNames(),
+ tableSchema.getFieldDataTypes(),
+ getPartitionKeyArray(),
+ partitionPolicy,
+ partitionField,
+ timePattern,
+ inputFormat,
+ outputFormat,
+ serializationLib);
+
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
HiveRollingPolicy rollingPolicy =
new HiveRollingPolicy(
@@ -387,9 +433,7 @@
if (dbName == null) {
dbName = identifier.getDatabaseName();
}
- LOG.info("11 ### dbName is " + dbName);
- return new HiveTableMetaStoreFactory(
- jobConf, hiveVersion, dbName, identifier.getObjectName());
+ return new HiveTableMetaStoreFactory(jobConf, hiveVersion, dbName, identifier.getObjectName());
}
private HadoopFileSystemFactory fsFactory() {
@@ -403,8 +447,9 @@
HiveRollingPolicy rollingPolicy,
OutputFileConfig outputFileConfig) {
HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
- return new HadoopPathBasedBulkFormatBuilder<>(
- new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
+ return new HadoopPathBasedBulkFormatBuilder<>(new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner,
+ dirtyOptions, dirtySink, schemaUpdatePolicy, partitionPolicy, hiveShim, hiveVersion,
+ sinkMultipleEnable, inputFormat, outputFormat, serializationLib)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
}
@@ -461,6 +506,10 @@
}
private List<String> getPartitionKeys() {
+ /*
+ * if (table != null) { return
+ * table.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); }
+ */
return catalogTable.getPartitionKeys();
}
@@ -507,7 +556,15 @@
inlongMetric,
auditHostAndPorts,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ schemaUpdatePolicy,
+ partitionPolicy,
+ partitionField,
+ timePattern,
+ sinkMultipleEnable,
+ inputFormat,
+ outputFormat,
+ serializationLib);
sink.staticPartitionSpec = staticPartitionSpec;
sink.overwrite = overwrite;
sink.dynamicGrouping = dynamicGrouping;
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveWriterFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveWriterFactory.java
new file mode 100644
index 0000000..75b8e0d
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveWriterFactory.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive;
+
+import org.apache.flink.connectors.hive.CachedSerializedValue;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Function;
+
+/**
+ * Factory for creating {@link RecordWriter} and converters for writing.
+ */
+public class HiveWriterFactory implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class hiveOutputFormatClz;
+
+ private final CachedSerializedValue<StorageDescriptor> storageDescriptor;
+
+ private final String[] allColumns;
+
+ private final DataType[] allTypes;
+
+ private final String[] partitionColumns;
+
+ private final Properties tableProperties;
+
+ private final JobConfWrapper confWrapper;
+
+ private final HiveShim hiveShim;
+
+ private final boolean isCompressed;
+
+ // SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common
+ // base class
+ private transient Serializer recordSerDe;
+
+ /**
+ * Field number excluding partition fields.
+ */
+ private transient int formatFields;
+
+ // to convert Flink object to Hive object
+ private transient HiveObjectConversion[] hiveConversions;
+
+ private transient DataFormatConverter[] converters;
+
+ // StructObjectInspector represents the hive row structure.
+ private transient StructObjectInspector formatInspector;
+
+ private transient boolean initialized;
+
+ private final boolean sinkMultipleEnable;
+
+ public HiveWriterFactory(JobConf jobConf,
+ Class hiveOutputFormatClz,
+ StorageDescriptor storageDescriptor,
+ TableSchema schema,
+ String[] partitionColumns,
+ Properties tableProperties,
+ HiveShim hiveShim,
+ boolean isCompressed,
+ boolean sinkMultipleEnable) {
+ Preconditions.checkArgument(HiveOutputFormat.class.isAssignableFrom(hiveOutputFormatClz),
+ "The output format should be an instance of HiveOutputFormat");
+ this.confWrapper = new JobConfWrapper(jobConf);
+ this.hiveOutputFormatClz = hiveOutputFormatClz;
+ try {
+ this.storageDescriptor = new CachedSerializedValue<>(storageDescriptor);
+ } catch (IOException e) {
+ throw new FlinkHiveException("Failed to serialize SerDeInfo", e);
+ }
+ this.allColumns = schema.getFieldNames();
+ this.allTypes = schema.getFieldDataTypes();
+ this.partitionColumns = partitionColumns;
+ this.tableProperties = tableProperties;
+ this.hiveShim = hiveShim;
+ this.isCompressed = isCompressed;
+ this.sinkMultipleEnable = sinkMultipleEnable;
+ }
+
+ /**
+ * Create a {@link RecordWriter} from path.
+ */
+ public RecordWriter createRecordWriter(Path path) {
+ try {
+ checkInitialize();
+ JobConf conf = new JobConf(confWrapper.conf());
+
+ if (isCompressed) {
+ String codecStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC.varname);
+ if (!StringUtils.isNullOrWhitespaceOnly(codecStr)) {
+ // noinspection unchecked
+ Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(
+ codecStr, true, Thread.currentThread().getContextClassLoader());
+ FileOutputFormat.setOutputCompressorClass(conf, codec);
+ }
+ String typeStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE.varname);
+ if (!StringUtils.isNullOrWhitespaceOnly(typeStr)) {
+ SequenceFile.CompressionType style = SequenceFile.CompressionType.valueOf(typeStr);
+ SequenceFileOutputFormat.setOutputCompressionType(conf, style);
+ }
+ }
+
+ return hiveShim.getHiveRecordWriter(conf, hiveOutputFormatClz, recordSerDe.getSerializedClass(),
+ isCompressed, tableProperties, path);
+ } catch (Exception e) {
+ throw new FlinkHiveException(e);
+ }
+ }
+
+ public JobConf getJobConf() {
+ return confWrapper.conf();
+ }
+
+ public StorageDescriptor getStorageDescriptor() throws IOException, ClassNotFoundException {
+ return storageDescriptor.deserializeValue();
+ }
+
+ public String[] getAllColumns() {
+ return allColumns;
+ }
+
+ public DataType[] getAllTypes() {
+ return allTypes;
+ }
+
+ public String[] getPartitionColumns() {
+ return partitionColumns;
+ }
+
+ public void checkInitialize() throws Exception {
+ if (initialized) {
+ return;
+ }
+
+ JobConf jobConf = confWrapper.conf();
+ Object serdeLib = Class.forName(getStorageDescriptor().getSerdeInfo().getSerializationLib()).newInstance();
+ Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
+ "Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+ + serdeLib.getClass().getName());
+ this.recordSerDe = (Serializer) serdeLib;
+ ReflectionUtils.setConf(recordSerDe, jobConf);
+
+ // TODO: support partition properties, for now assume they're same as table properties
+ SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
+
+ this.formatFields = allColumns.length - partitionColumns.length;
+ this.hiveConversions = new HiveObjectConversion[formatFields];
+ this.converters = new DataFormatConverter[formatFields];
+ List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
+ for (int i = 0; i < formatFields; i++) {
+ DataType type = allTypes[i];
+ ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
+ objectInspectors.add(objectInspector);
+ hiveConversions[i] = HiveInspectors.getConversion(objectInspector, type.getLogicalType(), hiveShim);
+ converters[i] = DataFormatConverters.getConverterForDataType(type);
+ }
+
+ this.formatInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ Arrays.asList(allColumns).subList(0, formatFields), objectInspectors);
+ this.initialized = true;
+ }
+
+ public Function<Row, Writable> createRowConverter() {
+ return row -> {
+ List<Object> fields = new ArrayList<>(formatFields);
+ for (int i = 0; i < formatFields; i++) {
+ fields.add(hiveConversions[i].toHiveObject(row.getField(i)));
+ }
+ return serialize(fields);
+ };
+ }
+
+ public Function<RowData, Writable> createRowDataConverter() {
+ return row -> {
+ List<Object> fields = new ArrayList<>(formatFields);
+ for (int i = 0; i < formatFields; i++) {
+ if (sinkMultipleEnable) {
+ GenericRowData rowData = (GenericRowData) row;
+ fields.add(hiveConversions[i].toHiveObject(rowData.getField(i)));
+ } else {
+ fields.add(hiveConversions[i].toHiveObject(converters[i].toExternal(row, i)));
+ }
+ }
+ return serialize(fields);
+ };
+ }
+
+ private Writable serialize(List<Object> fields) {
+ try {
+ return recordSerDe.serialize(fields, formatInspector);
+ } catch (SerDeException e) {
+ throw new FlinkHiveException(e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index c3b408c..ecae4e23 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -17,15 +17,12 @@
package org.apache.inlong.sort.hive.filesystem;
-import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
-import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.flink.api.common.state.ListState;
@@ -52,8 +49,6 @@
import javax.annotation.Nullable;
-import java.io.IOException;
-
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
@@ -83,7 +78,6 @@
private @Nullable final String auditHostAndPorts;
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
-
// --------------------------- runtime fields -----------------------------
private transient Buckets<IN, String> buckets;
@@ -93,13 +87,10 @@
private transient long currentWatermark;
@Nullable
- private transient SinkMetricData metricData;
+ private transient SinkTableMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
- private Long dataSize = 0L;
- private Long rowSize = 0L;
-
public AbstractStreamingWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder,
@@ -141,11 +132,6 @@
protected void commitUpToCheckpoint(long checkpointId) throws Exception {
try {
helper.commitUpToCheckpoint(checkpointId);
- if (metricData != null) {
- metricData.invoke(rowSize, dataSize);
- }
- rowSize = 0L;
- dataSize = 0L;
} catch (Exception e) {
LOG.error("hive sink commitUpToCheckpoint occurs error.", e);
throw e;
@@ -155,18 +141,6 @@
@Override
public void open() throws Exception {
super.open();
- MetricOption metricOption = MetricOption.builder()
- .withInlongLabels(inlongMetric)
- .withAuditAddress(auditHostAndPorts)
- .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
- .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
- .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
- .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
- .withRegisterMetric(RegisteredMetric.ALL)
- .build();
- if (metricOption != null) {
- metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
- }
if (dirtySink != null) {
dirtySink.open(new Configuration());
}
@@ -175,6 +149,39 @@
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
+
+ // init metric state
+ if (this.inlongMetric != null) {
+ this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+
+ if (context.isRestored()) {
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+
+ }
+
+ MetricOption metricOption = MetricOption.builder().withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+ .withRegisterMetric(RegisteredMetric.ALL).build();
+
+ if (metricOption != null) {
+ metricData = new SinkTableMetricData(metricOption, getRuntimeContext().getMetricGroup());
+ metricData.registerSubMetricsGroup(metricState);
+ if (this.bucketsBuilder instanceof HadoopPathBasedBulkFormatBuilder) {
+ ((HadoopPathBasedBulkFormatBuilder) this.bucketsBuilder).setMetricData(metricData);
+ }
+ }
+
+ // metricData must be initialized first, then HadoopPathBasedPartFileWriter can use this metricData to upload
+ // metric data.
buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
// Set listener before the initialization of Buckets.
@@ -203,18 +210,6 @@
bucketCheckInterval);
currentWatermark = Long.MIN_VALUE;
-
- // init metric state
- if (this.inlongMetric != null) {
- this.metricStateListState = context.getOperatorStateStore().getUnionListState(
- new ListStateDescriptor<>(
- INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
- })));
- }
- if (context.isRestored()) {
- metricState = MetricStateUtils.restoreMetricState(metricStateListState,
- getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
- }
}
@Override
@@ -235,44 +230,11 @@
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
- try {
- helper.onElement(
- element.getValue(),
- getProcessingTimeService().getCurrentProcessingTime(),
- element.hasTimestamp() ? element.getTimestamp() : null,
- currentWatermark);
- rowSize = rowSize + 1;
- if (element.getValue() != null) {
- dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(element.getValue());
- }
- } catch (IOException e) {
- throw e;
- } catch (Exception e) {
- LOGGER.error("StreamingWriter write failed", e);
- if (!dirtyOptions.ignoreDirty()) {
- throw new RuntimeException(e);
- }
- if (metricData != null) {
- metricData.invokeDirtyWithEstimate(element.getValue());
- }
- if (dirtySink != null) {
- DirtyData.Builder<Object> builder = DirtyData.builder();
- try {
- builder.setData(element.getValue())
- .setDirtyType(DirtyType.UNDEFINED)
- .setLabels(dirtyOptions.getLabels())
- .setLogTag(dirtyOptions.getLogTag())
- .setDirtyMessage(e.getMessage())
- .setIdentifier(dirtyOptions.getIdentifier());
- dirtySink.invoke(builder.build());
- } catch (Exception ex) {
- if (!dirtyOptions.ignoreSideOutputErrors()) {
- throw new RuntimeException(ex);
- }
- LOGGER.warn("Dirty sink failed", ex);
- }
- }
- }
+ helper.onElement(
+ element.getValue(),
+ getProcessingTimeService().getCurrentProcessingTime(),
+ element.hasTimestamp() ? element.getTimestamp() : null,
+ currentWatermark);
}
@Override
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/DefaultHadoopFileCommitterFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/DefaultHadoopFileCommitterFactory.java
new file mode 100644
index 0000000..644b9a1
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/DefaultHadoopFileCommitterFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * The default hadoop file committer factory which always use {@link HadoopRenameFileCommitter}.
+ */
+public class DefaultHadoopFileCommitterFactory implements HadoopFileCommitterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean sinkMultipleEnable;
+
+ public DefaultHadoopFileCommitterFactory(boolean sinkMultipleEnable) {
+ this.sinkMultipleEnable = sinkMultipleEnable;
+ }
+
+ @Override
+ public HadoopFileCommitter create(Configuration configuration, Path targetFilePath) throws IOException {
+ return new HadoopRenameFileCommitter(configuration, targetFilePath, sinkMultipleEnable);
+ }
+
+ @Override
+ public HadoopFileCommitter recoverForCommit(Configuration configuration, Path targetFilePath, Path tempFilePath) {
+ return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath, sinkMultipleEnable);
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedBulkFormatBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedBulkFormatBuilder.java
new file mode 100644
index 0000000..9fb3791
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedBulkFormatBuilder.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
+import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.SerializableConfiguration;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** Buckets builder to create buckets that use {@link HadoopPathBasedPartFileWriter}. */
+public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPathBasedBulkFormatBuilder<IN, BucketID, T>>
+ extends
+ StreamingFileSink.BucketsBuilder<IN, BucketID, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Path basePath;
+
+ private final HadoopPathBasedBulkWriter.Factory<IN> writerFactory;
+
+ private final HadoopFileCommitterFactory fileCommitterFactory;
+
+ private SerializableConfiguration serializableConfiguration;
+
+ private BucketAssigner<IN, BucketID> bucketAssigner;
+
+ private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
+
+ private BucketFactory<IN, BucketID> bucketFactory;
+
+ private OutputFileConfig outputFileConfig;
+
+ @Nullable
+ private transient SinkTableMetricData metricData;
+
+ private final DirtyOptions dirtyOptions;
+
+ private final @Nullable DirtySink<Object> dirtySink;
+
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+ private final PartitionPolicy partitionPolicy;
+ private final HiveShim hiveShim;
+ private final String hiveVersion;
+ private final String inputFormat;
+ private final String outputFormat;
+ private final String serializationLib;
+
+ public HadoopPathBasedBulkFormatBuilder(
+ org.apache.hadoop.fs.Path basePath,
+ HadoopPathBasedBulkWriter.Factory<IN> writerFactory,
+ Configuration configuration,
+ BucketAssigner<IN, BucketID> assigner,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+ PartitionPolicy partitionPolicy,
+ HiveShim hiveShim,
+ String hiveVersion,
+ boolean sinkMultipleEnable,
+ String inputFormat,
+ String outputFormat,
+ String serializationLib) {
+ this(
+ basePath,
+ writerFactory,
+ new DefaultHadoopFileCommitterFactory(sinkMultipleEnable),
+ configuration,
+ assigner,
+ OnCheckpointRollingPolicy.build(),
+ new DefaultBucketFactoryImpl<>(),
+ OutputFileConfig.builder().build(),
+ dirtyOptions,
+ dirtySink,
+ schemaUpdatePolicy,
+ partitionPolicy,
+ hiveShim,
+ hiveVersion,
+ inputFormat,
+ outputFormat,
+ serializationLib);
+ }
+
+ public HadoopPathBasedBulkFormatBuilder(
+ org.apache.hadoop.fs.Path basePath,
+ HadoopPathBasedBulkWriter.Factory<IN> writerFactory,
+ HadoopFileCommitterFactory fileCommitterFactory,
+ Configuration configuration,
+ BucketAssigner<IN, BucketID> assigner,
+ CheckpointRollingPolicy<IN, BucketID> policy,
+ BucketFactory<IN, BucketID> bucketFactory,
+ OutputFileConfig outputFileConfig,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+ PartitionPolicy partitionPolicy,
+ HiveShim hiveShim,
+ String hiveVersion,
+ String inputFormat,
+ String outputFormat,
+ String serializationLib) {
+
+ this.basePath = new Path(Preconditions.checkNotNull(basePath).toString());
+ this.writerFactory = writerFactory;
+ this.fileCommitterFactory = fileCommitterFactory;
+ this.serializableConfiguration = new SerializableConfiguration(configuration);
+ this.bucketAssigner = Preconditions.checkNotNull(assigner);
+ this.rollingPolicy = Preconditions.checkNotNull(policy);
+ this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+ this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
+ this.partitionPolicy = partitionPolicy;
+ this.hiveShim = hiveShim;
+ this.hiveVersion = hiveVersion;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.serializationLib = serializationLib;
+ }
+
+ public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
+ this.bucketAssigner = Preconditions.checkNotNull(assigner);
+ return self();
+ }
+
+ public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) {
+ this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+ return self();
+ }
+
+ public T withBucketFactory(BucketFactory<IN, BucketID> factory) {
+ this.bucketFactory = Preconditions.checkNotNull(factory);
+ return self();
+ }
+
+ public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
+ this.outputFileConfig = outputFileConfig;
+ return self();
+ }
+
+ public T withConfiguration(Configuration configuration) {
+ this.serializableConfiguration = new SerializableConfiguration(configuration);
+ return self();
+ }
+
+ @Override
+ public BucketWriter<IN, BucketID> createBucketWriter() {
+ return new HadoopPathBasedPartFileWriter.HadoopPathBasedBucketWriter<>(
+ serializableConfiguration.getConfiguration(),
+ writerFactory,
+ fileCommitterFactory,
+ metricData,
+ dirtyOptions,
+ dirtySink,
+ schemaUpdatePolicy,
+ partitionPolicy,
+ hiveShim,
+ hiveVersion,
+ inputFormat,
+ outputFormat,
+ serializationLib);
+ }
+
+ @Override
+ public Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
+ return new Buckets<>(
+ basePath,
+ bucketAssigner,
+ bucketFactory,
+ createBucketWriter(),
+ rollingPolicy,
+ subtaskIndex,
+ outputFileConfig);
+ }
+
+ public void setMetricData(@Nullable SinkTableMetricData metricData) {
+ this.metricData = metricData;
+ }
+
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedPartFileWriter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedPartFileWriter.java
new file mode 100644
index 0000000..de1d87c
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedPartFileWriter.java
@@ -0,0 +1,816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.hive.HiveBulkWriterFactory;
+import org.apache.inlong.sort.hive.HiveWriterFactory;
+import org.apache.inlong.sort.hive.util.CacheHolder;
+import org.apache.inlong.sort.hive.util.HiveTableUtil;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.hive.HiveOptions.HIVE_SCHEMA_SCAN_INTERVAL;
+
+/**
+ * The part-file writer that writes to the specified hadoop path.
+ */
+public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopPathBasedPartFileWriter.class);
+
+ private final InLongHadoopPathBasedBulkWriter writer;
+
+ private final HadoopFileCommitter fileCommitter;
+
+ private BucketID bucketID;
+ private Path targetPath;
+ private Path inProgressPath;
+ private final HiveShim hiveShim;
+ private final String hiveVersion;
+
+ private final boolean sinkMultipleEnable;
+ private final String sinkMultipleFormat;
+
+ private final String databasePattern;
+ private final String tablePattern;
+
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+
+ private final PartitionPolicy partitionPolicy;
+
+ private final String inputFormat;
+
+ private final String outputFormat;
+
+ private final String serializationLib;
+
+ private transient JsonDynamicSchemaFormat jsonFormat;
+
+ @Nullable
+ private final transient SinkTableMetricData metricData;
+
+ private final DirtyOptions dirtyOptions;
+
+ private final @Nullable DirtySink<Object> dirtySink;
+
+ public HadoopPathBasedPartFileWriter(final BucketID bucketID,
+ Path targetPath,
+ Path inProgressPath,
+ InLongHadoopPathBasedBulkWriter writer,
+ HadoopFileCommitter fileCommitter,
+ long createTime,
+ HiveShim hiveShim,
+ String hiveVersion,
+ boolean sinkMultipleEnable,
+ String sinkMultipleFormat,
+ String databasePattern,
+ String tablePattern,
+ @Nullable SinkTableMetricData metricData,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+ PartitionPolicy partitionPolicy,
+ String inputFormat,
+ String outputFormat,
+ String serializationLib) {
+ super(bucketID, createTime);
+
+ this.bucketID = bucketID;
+ this.targetPath = targetPath;
+ this.inProgressPath = inProgressPath;
+ this.writer = writer;
+ this.fileCommitter = fileCommitter;
+ this.hiveShim = hiveShim;
+ this.hiveVersion = hiveVersion;
+ this.sinkMultipleEnable = sinkMultipleEnable;
+ this.sinkMultipleFormat = sinkMultipleFormat;
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ this.metricData = metricData;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
+ this.partitionPolicy = partitionPolicy;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.serializationLib = serializationLib;
+ }
+
+ @Override
+ public void write(IN element, long currentTime) {
+ String databaseName = null;
+ String tableName = null;
+ int recordNum = 1;
+ int recordSize = 0;
+ JsonNode rootNode = null;
+ ObjectIdentifier identifier = null;
+ HashMap<ObjectIdentifier, Long> ignoreWritingTableMap = CacheHolder.getIgnoreWritingTableMap();
+ try {
+ if (sinkMultipleEnable) {
+ GenericRowData rowData = (GenericRowData) element;
+ if (jsonFormat == null) {
+ jsonFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ }
+ byte[] rawData = (byte[]) rowData.getField(0);
+ rootNode = jsonFormat.deserialize(rawData);
+ LOG.debug("root node: {}", rootNode);
+ boolean isDDL = jsonFormat.extractDDLFlag(rootNode);
+ if (isDDL) {
+ // Ignore ddl change for now
+ return;
+ }
+ databaseName = jsonFormat.parse(rootNode, databasePattern);
+ tableName = jsonFormat.parse(rootNode, tablePattern);
+
+ List<Map<String, Object>> physicalDataList = HiveTableUtil.jsonNode2Map(
+ jsonFormat.getPhysicalData(rootNode));
+ recordNum = physicalDataList.size();
+
+ identifier = HiveTableUtil.createObjectIdentifier(databaseName, tableName);
+
+ // ignore writing data into this table
+ if (ignoreWritingTableMap.containsKey(identifier)) {
+ return;
+ }
+
+ List<String> pkListStr = jsonFormat.extractPrimaryKeyNames(rootNode);
+ RowType schema = jsonFormat.extractSchema(rootNode, pkListStr);
+ HiveWriterFactory writerFactory = getHiveWriterFactory(identifier, schema, hiveVersion);
+
+ // parse the real location of hive table
+ Path inProgressFilePath = getInProgressPath(writerFactory);
+
+ LOG.debug("in progress file path: {}", inProgressFilePath);
+ Pair<RecordWriter, Function<RowData, Writable>> pair = getRecordWriterAndRowConverter(writerFactory,
+ inProgressFilePath);
+ // reset record writer and row converter
+ writer.setRecordWriter(pair.getLeft());
+ writer.setRowConverter(pair.getRight());
+ writer.setInProgressPath(inProgressFilePath);
+
+ boolean replaceLineBreak = writerFactory.getStorageDescriptor().getInputFormat()
+ .contains("TextInputFormat");
+
+ for (Map<String, Object> record : physicalDataList) {
+ // check and alter hive table if schema has changed
+ boolean changed = checkSchema(identifier, writerFactory, schema);
+ if (changed) {
+ // remove cache and reload hive writer factory
+ CacheHolder.getFactoryMap().remove(identifier);
+ writerFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+ assert writerFactory != null;
+ FileSinkOperator.RecordWriter recordWriter = writerFactory.createRecordWriter(
+ inProgressFilePath);
+ Function<RowData, Writable> rowConverter = writerFactory.createRowDataConverter();
+ writer.setRecordWriter(recordWriter);
+ writer.setRowConverter(rowConverter);
+ CacheHolder.getRecordWriterHashMap().put(inProgressFilePath, recordWriter);
+ CacheHolder.getRowConverterHashMap().put(inProgressFilePath, rowConverter);
+ }
+
+ LOG.debug("record: {}", record);
+ LOG.debug("columns : {}", Arrays.deepToString(writerFactory.getAllColumns()));
+ LOG.debug("types: {}", Arrays.deepToString(writerFactory.getAllTypes()));
+ Pair<GenericRowData, Integer> rowDataPair = HiveTableUtil.getRowData(record,
+ writerFactory.getAllColumns(), writerFactory.getAllTypes(), replaceLineBreak);
+ GenericRowData genericRowData = rowDataPair.getLeft();
+ recordSize += rowDataPair.getRight();
+ LOG.debug("generic row data: {}", genericRowData);
+ writer.addElement(genericRowData);
+ }
+ if (metricData != null) {
+ metricData.outputMetrics(databaseName, tableName, recordNum, recordSize);
+ }
+ } else {
+ RowData data = (RowData) element;
+ writer.addElement(data);
+ if (metricData != null) {
+ if (data instanceof BinaryRowData) {
+ // mysql cdc sends BinaryRowData
+ metricData.invoke(1, ((BinaryRowData) data).getSizeInBytes());
+ } else {
+ // oracle cdc sends GenericRowData
+ metricData.invoke(1, data.toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+ }
+ }
+ } catch (Exception e) {
+ if (schemaUpdatePolicy == null || SchemaUpdateExceptionPolicy.THROW_WITH_STOP == schemaUpdatePolicy) {
+ throw new FlinkHiveException("Failed to write data", e);
+ } else if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == schemaUpdatePolicy) {
+ if (identifier != null) {
+ ignoreWritingTableMap.put(identifier, 1L);
+ }
+ } else if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == schemaUpdatePolicy) {
+ handleDirtyData(databaseName, tableName, recordNum, recordSize, rootNode, jsonFormat, e);
+ }
+ LOG.error("Failed to write data", e);
+ }
+ markWrite(currentTime);
+ }
+
+ /**
+ * upload dirty data metrics and write dirty data
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ * @param recordNum record num
+ * @param recordSize record byte size
+ * @param data raw data
+ * @param jsonFormat json formatter for formatting raw data
+ * @param e exception
+ */
+ private void handleDirtyData(String databaseName,
+ String tableName,
+ int recordNum,
+ int recordSize,
+ JsonNode data,
+ JsonDynamicSchemaFormat jsonFormat,
+ Exception e) {
+ // upload metrics for dirty data
+ if (null != metricData) {
+ if (sinkMultipleEnable) {
+ metricData.outputDirtyMetrics(databaseName, tableName, recordNum, recordSize);
+ } else {
+ metricData.invokeDirty(recordNum, recordSize);
+ }
+ }
+
+ if (!dirtyOptions.ignoreDirty()) {
+ return;
+ }
+ if (data == null || jsonFormat == null) {
+ return;
+ }
+ Triple<String, String, String> triple = getDirtyLabelTagAndIdentity(data, jsonFormat);
+ String label = triple.getLeft();
+ String tag = triple.getMiddle();
+ String identify = triple.getRight();
+ if (label == null || tag == null || identify == null) {
+ LOG.warn("dirty label or tag or identify is null, ignore dirty data writing");
+ return;
+ }
+ // archive dirty data
+ DirtySinkHelper<Object> dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+ List<Map<String, Object>> physicalDataList = HiveTableUtil.jsonNode2Map(jsonFormat.getPhysicalData(data));
+ for (Map<String, Object> record : physicalDataList) {
+ JsonNode jsonNode = HiveTableUtil.object2JsonNode(record);
+ dirtySinkHelper.invoke(jsonNode, DirtyType.BATCH_LOAD_ERROR, label, tag, identify, e);
+ }
+ }
+
+ /**
+ * parse dirty label , tag and identify
+ *
+ * @param data raw data
+ * @param jsonFormat json formatter
+ * @return dirty label, tag and identify
+ */
+ private Triple<String, String, String> getDirtyLabelTagAndIdentity(JsonNode data,
+ JsonDynamicSchemaFormat jsonFormat) {
+ String dirtyLabel = null;
+ String dirtyLogTag = null;
+ String dirtyIdentify = null;
+ try {
+ if (dirtyOptions.ignoreDirty()) {
+ if (dirtyOptions.getLabels() != null) {
+ dirtyLabel = jsonFormat.parse(data,
+ DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), DirtyType.BATCH_LOAD_ERROR, null));
+ }
+ if (dirtyOptions.getLogTag() != null) {
+ dirtyLogTag = jsonFormat.parse(data,
+ DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), DirtyType.BATCH_LOAD_ERROR, null));
+ }
+ if (dirtyOptions.getIdentifier() != null) {
+ dirtyIdentify = jsonFormat.parse(data,
+ DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), DirtyType.BATCH_LOAD_ERROR,
+ null));
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Parse dirty options failed. {}", ExceptionUtils.stringifyException(e));
+ }
+ return new ImmutableTriple<>(dirtyLabel, dirtyLogTag, dirtyIdentify);
+ }
+
+ /**
+ * get hive writer factory, create table if not exists automatically
+ *
+ * @param identifier hive database and table name
+ * @param schema hive field with flink type
+ * @return hive writer factory
+ */
+ private HiveWriterFactory getHiveWriterFactory(ObjectIdentifier identifier, RowType schema, String hiveVersion) {
+ HiveWriterFactory writerFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+ if (writerFactory == null) {
+ // hive table may not exist, auto create
+ HiveTableUtil.createTable(identifier.getDatabaseName(), identifier.getObjectName(), schema, partitionPolicy,
+ hiveVersion, inputFormat, outputFormat, serializationLib);
+ writerFactory = HiveTableUtil.getWriterFactory(hiveShim, hiveVersion, identifier);
+ }
+ return writerFactory;
+ }
+
+ /**
+ * get target hdfs path and temp hdfs path for data writing
+ *
+ * @param writerFactory hive writer factory
+ * @return pair of target path and temp path
+ */
+ private Path getInProgressPath(HiveWriterFactory writerFactory) throws IOException, ClassNotFoundException {
+ String location = writerFactory.getStorageDescriptor().getLocation();
+ String path = targetPath.toUri().getPath();
+ path = path.substring(path.indexOf("/tmp/") + 5);
+ Path targetPath = new Path(location + "/" + path);
+ return new Path(targetPath.getParent() + "/" + inProgressPath.getName());
+ }
+
+ /**
+ * get hive record writer and row converter
+ *
+ * @param writerFactory hive writer factory
+ * @param inProgressFilePath temp hdfs file path for writing data
+ * @return pair of hive record writer and row converter objects
+ */
+ private Pair<RecordWriter, Function<RowData, Writable>> getRecordWriterAndRowConverter(
+ HiveWriterFactory writerFactory, Path inProgressFilePath) {
+ FileSinkOperator.RecordWriter recordWriter;
+ Function<RowData, Writable> rowConverter;
+ if (!CacheHolder.getRecordWriterHashMap().containsKey(inProgressFilePath)) {
+ recordWriter = writerFactory.createRecordWriter(inProgressFilePath);
+ rowConverter = writerFactory.createRowDataConverter();
+ CacheHolder.getRecordWriterHashMap().put(inProgressFilePath, recordWriter);
+ CacheHolder.getRowConverterHashMap().put(inProgressFilePath, rowConverter);
+ } else {
+ recordWriter = CacheHolder.getRecordWriterHashMap().get(inProgressFilePath);
+ rowConverter = CacheHolder.getRowConverterHashMap().get(inProgressFilePath);
+ }
+ return new ImmutablePair<>(recordWriter, rowConverter);
+ }
+
+ /**
+ * check if source table schema changes
+ *
+ * @param identifier hive database name and table name
+ * @param writerFactory hive writer factory
+ * @param schema hive field with flink types
+ * @return if schema has changed
+ */
+ private boolean checkSchema(ObjectIdentifier identifier, HiveWriterFactory writerFactory, RowType schema) {
+ HashMap<ObjectIdentifier, Long> schemaCheckTimeMap = CacheHolder.getSchemaCheckTimeMap();
+ long lastUpdate = schemaCheckTimeMap.getOrDefault(identifier, -1L);
+ // handle the schema every `HIVE_SCHEMA_SCAN_INTERVAL` milliseconds
+ int scanSchemaInterval = Integer.parseInt(writerFactory.getJobConf()
+ .get(HIVE_SCHEMA_SCAN_INTERVAL.key(), HIVE_SCHEMA_SCAN_INTERVAL.defaultValue() + ""));
+ boolean changed = false;
+ if (System.currentTimeMillis() - lastUpdate >= scanSchemaInterval) {
+ changed = HiveTableUtil.changeSchema(schema, writerFactory.getAllColumns(),
+ writerFactory.getAllTypes(), identifier.getDatabaseName(), identifier.getObjectName(), hiveVersion);
+ schemaCheckTimeMap.put(identifier, System.currentTimeMillis());
+ }
+ return changed;
+ }
+
+ @Override
+ public InProgressFileRecoverable persist() {
+ throw new UnsupportedOperationException("The path based writers do not support persisting");
+ }
+
+ @Override
+ public PendingFileRecoverable closeForCommit() throws IOException {
+ if (sinkMultipleEnable) {
+ LOG.info("record writer cache {}", CacheHolder.getRecordWriterHashMap());
+ Iterator<Path> iterator = CacheHolder.getRecordWriterHashMap().keySet().iterator();
+ while (iterator.hasNext()) {
+ Path inProgressFilePath = iterator.next();
+ // one flink batch writes many hive tables, they are the same inProgressPath
+ if (inProgressFilePath.getName().equals(this.inProgressPath.getName())) {
+ FileSinkOperator.RecordWriter recordWriter = CacheHolder.getRecordWriterHashMap()
+ .get(inProgressFilePath);
+ writer.setRecordWriter(recordWriter);
+ writer.flush();
+ writer.finish();
+ // clear cache
+ iterator.remove();
+ CacheHolder.getRowConverterHashMap().remove(inProgressFilePath);
+
+ // parse the target location of hive table
+ String tmpFileName = inProgressFilePath.getName();
+ String targetPathName = tmpFileName.substring(1, tmpFileName.lastIndexOf(".inprogress"));
+ Path targetPath = new Path(inProgressFilePath.getParent() + "/" + targetPathName);
+
+ LOG.info("file committer target path {}, in progress file {}", targetPath, inProgressFilePath);
+ HadoopRenameFileCommitter committer = new HadoopRenameFileCommitter(
+ ((HadoopRenameFileCommitter) fileCommitter).getConfiguration(),
+ targetPath,
+ inProgressFilePath,
+ true);
+ CacheHolder.getFileCommitterHashMap().put(inProgressFilePath, committer);
+ }
+ }
+ } else {
+ writer.flush();
+ writer.finish();
+ fileCommitter.preCommit();
+ }
+ return new HadoopPathBasedPendingFile(fileCommitter, getSize()).getRecoverable();
+ }
+
+ @Override
+ public void dispose() {
+ writer.dispose();
+ }
+
+ @Override
+ public long getSize() throws IOException {
+ return writer.getSize();
+ }
+
+ static class HadoopPathBasedPendingFile implements BucketWriter.PendingFile {
+
+ private final HadoopFileCommitter fileCommitter;
+
+ private final long fileSize;
+
+ public HadoopPathBasedPendingFile(HadoopFileCommitter fileCommitter, long fileSize) {
+ this.fileCommitter = fileCommitter;
+ this.fileSize = fileSize;
+ }
+
+ @Override
+ public void commit() throws IOException {
+ fileCommitter.commit();
+ }
+
+ @Override
+ public void commitAfterRecovery() throws IOException {
+ fileCommitter.commitAfterRecovery();
+ }
+
+ public PendingFileRecoverable getRecoverable() {
+ return new HadoopPathBasedPendingFileRecoverable(fileCommitter.getTargetFilePath(),
+ fileCommitter.getTempFilePath(), fileSize);
+ }
+ }
+
+ @VisibleForTesting
+ static class HadoopPathBasedPendingFileRecoverable implements PendingFileRecoverable {
+
+ private final Path targetFilePath;
+
+ private final Path tempFilePath;
+
+ private final long fileSize;
+
+ @Deprecated
+ // Remained for compatibility
+ public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path tempFilePath) {
+ this.targetFilePath = targetFilePath;
+ this.tempFilePath = tempFilePath;
+ this.fileSize = -1L;
+ }
+
+ public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path tempFilePath, long fileSize) {
+ this.targetFilePath = targetFilePath;
+ this.tempFilePath = tempFilePath;
+ this.fileSize = fileSize;
+ }
+
+ public Path getTargetFilePath() {
+ return targetFilePath;
+ }
+
+ public Path getTempFilePath() {
+ return tempFilePath;
+ }
+
+ public org.apache.flink.core.fs.Path getPath() {
+ return new org.apache.flink.core.fs.Path(targetFilePath.toString());
+ }
+
+ public long getSize() {
+ return fileSize;
+ }
+ }
+
+ @VisibleForTesting
+ static class HadoopPathBasedPendingFileRecoverableSerializer
+ implements
+ SimpleVersionedSerializer<PendingFileRecoverable> {
+
+ static final HadoopPathBasedPendingFileRecoverableSerializer INSTANCE =
+ new HadoopPathBasedPendingFileRecoverableSerializer();
+
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+ private static final int MAGIC_NUMBER = 0x2c853c90;
+
+ @Override
+ public int getVersion() {
+ return 2;
+ }
+
+ @Override
+ public byte[] serialize(PendingFileRecoverable pendingFileRecoverable) {
+ if (!(pendingFileRecoverable instanceof HadoopPathBasedPartFileWriter.HadoopPathBasedPendingFileRecoverable)) {
+ throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
+ }
+
+ HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
+ (HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
+ Path path = hadoopRecoverable.getTargetFilePath();
+ Path inProgressPath = hadoopRecoverable.getTempFilePath();
+
+ byte[] pathBytes = path.toUri().toString().getBytes(CHARSET);
+ byte[] inProgressBytes = inProgressPath.toUri().toString().getBytes(CHARSET);
+
+ byte[] targetBytes = new byte[12 + pathBytes.length + inProgressBytes.length + Long.BYTES];
+ ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+ bb.putInt(MAGIC_NUMBER);
+ bb.putInt(pathBytes.length);
+ bb.put(pathBytes);
+ bb.putInt(inProgressBytes.length);
+ bb.put(inProgressBytes);
+ bb.putLong(hadoopRecoverable.getSize());
+
+ return targetBytes;
+ }
+
+ @Override
+ public HadoopPathBasedPendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ case 2:
+ return deserializeV2(serialized);
+ default:
+ throw new IOException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ private HadoopPathBasedPendingFileRecoverable deserializeV1(byte[] serialized) throws IOException {
+ final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+ if (bb.getInt() != MAGIC_NUMBER) {
+ throw new IOException("Corrupt data: Unexpected magic number.");
+ }
+
+ byte[] targetFilePathBytes = new byte[bb.getInt()];
+ bb.get(targetFilePathBytes);
+ String targetFilePath = new String(targetFilePathBytes, CHARSET);
+
+ byte[] tempFilePathBytes = new byte[bb.getInt()];
+ bb.get(tempFilePathBytes);
+ String tempFilePath = new String(tempFilePathBytes, CHARSET);
+
+ return new HadoopPathBasedPendingFileRecoverable(new Path(targetFilePath), new Path(tempFilePath));
+ }
+
+ private HadoopPathBasedPendingFileRecoverable deserializeV2(byte[] serialized) throws IOException {
+ final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+ if (bb.getInt() != MAGIC_NUMBER) {
+ throw new IOException("Corrupt data: Unexpected magic number.");
+ }
+
+ byte[] targetFilePathBytes = new byte[bb.getInt()];
+ bb.get(targetFilePathBytes);
+ String targetFilePath = new String(targetFilePathBytes, CHARSET);
+
+ byte[] tempFilePathBytes = new byte[bb.getInt()];
+ bb.get(tempFilePathBytes);
+ String tempFilePath = new String(tempFilePathBytes, CHARSET);
+
+ long fileSize = bb.getLong();
+
+ return new HadoopPathBasedPendingFileRecoverable(new Path(targetFilePath), new Path(tempFilePath),
+ fileSize);
+ }
+ }
+
+ private static class UnsupportedInProgressFileRecoverableSerializable
+ implements
+ SimpleVersionedSerializer<InProgressFileRecoverable> {
+
+ static final UnsupportedInProgressFileRecoverableSerializable INSTANCE =
+ new UnsupportedInProgressFileRecoverableSerializable();
+
+ @Override
+ public int getVersion() {
+ throw new UnsupportedOperationException("Persists the path-based part file write is not supported");
+ }
+
+ @Override
+ public byte[] serialize(InProgressFileRecoverable obj) {
+ throw new UnsupportedOperationException("Persists the path-based part file write is not supported");
+ }
+
+ @Override
+ public InProgressFileRecoverable deserialize(int version, byte[] serialized) {
+ throw new UnsupportedOperationException("Persists the path-based part file write is not supported");
+ }
+ }
+
+ /**
+ * Factory to create {@link HadoopPathBasedPartFileWriter}. This writer does not support
+ * snapshotting the in-progress files. For pending files, it stores the target path and the
+ * staging file path into the state.
+ */
+ public static class HadoopPathBasedBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
+
+ private final Configuration configuration;
+
+ private final HadoopPathBasedBulkWriter.Factory<IN> bulkWriterFactory;
+
+ private final HadoopFileCommitterFactory fileCommitterFactory;
+
+ private final HiveWriterFactory hiveWriterFactory;
+
+ @Nullable
+ private final transient SinkTableMetricData metricData;
+
+ private final DirtyOptions dirtyOptions;
+
+ private final @Nullable DirtySink<Object> dirtySink;
+
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+
+ private final PartitionPolicy partitionPolicy;
+
+ private final String inputFormat;
+
+ private final String outputFormat;
+
+ private final String serializationLib;
+
+ private final HiveShim hiveShim;
+ private final String hiveVersion;
+
+ public HadoopPathBasedBucketWriter(Configuration configuration,
+ HadoopPathBasedBulkWriter.Factory<IN> bulkWriterFactory,
+ HadoopFileCommitterFactory fileCommitterFactory, @Nullable SinkTableMetricData metricData,
+ DirtyOptions dirtyOptions, @Nullable DirtySink<Object> dirtySink,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy,
+ PartitionPolicy partitionPolicy,
+ HiveShim hiveShim,
+ String hiveVersion,
+ String inputFormat,
+ String outputFormat,
+ String serializationLib) {
+ this.configuration = configuration;
+ this.bulkWriterFactory = bulkWriterFactory;
+ this.hiveWriterFactory = ((HiveBulkWriterFactory) this.bulkWriterFactory).getFactory();
+ this.fileCommitterFactory = fileCommitterFactory;
+ this.metricData = metricData;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
+ this.partitionPolicy = partitionPolicy;
+ this.hiveShim = hiveShim;
+ this.hiveVersion = hiveVersion;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.serializationLib = serializationLib;
+ }
+
+ @Override
+ public HadoopPathBasedPartFileWriter<IN, BucketID> openNewInProgressFile(BucketID bucketID,
+ org.apache.flink.core.fs.Path flinkPath, long creationTime) throws IOException {
+
+ Path path = new Path(flinkPath.toUri());
+ HadoopFileCommitter fileCommitter = fileCommitterFactory.create(configuration, path);
+ Path inProgressFilePath = fileCommitter.getTempFilePath();
+
+ InLongHadoopPathBasedBulkWriter writer = (InLongHadoopPathBasedBulkWriter) bulkWriterFactory.create(path,
+ inProgressFilePath);
+ JobConf jobConf = hiveWriterFactory.getJobConf();
+ boolean sinkMultipleEnable = Boolean.parseBoolean(jobConf.get(SINK_MULTIPLE_ENABLE.key(), "false"));
+ String sinkMultipleFormat = jobConf.get(SINK_MULTIPLE_FORMAT.key());
+ String databasePattern = jobConf.get(SINK_MULTIPLE_DATABASE_PATTERN.key());
+ String tablePattern = jobConf.get(SINK_MULTIPLE_TABLE_PATTERN.key());
+ return new HadoopPathBasedPartFileWriter<>(bucketID,
+ path,
+ inProgressFilePath,
+ writer,
+ fileCommitter,
+ creationTime,
+ hiveShim,
+ hiveVersion,
+ sinkMultipleEnable,
+ sinkMultipleFormat,
+ databasePattern,
+ tablePattern,
+ metricData,
+ dirtyOptions,
+ dirtySink,
+ schemaUpdatePolicy,
+ partitionPolicy,
+ inputFormat,
+ outputFormat,
+ serializationLib);
+ }
+
+ @Override
+ public PendingFile recoverPendingFile(PendingFileRecoverable pendingFileRecoverable) throws IOException {
+ if (!(pendingFileRecoverable instanceof HadoopPathBasedPartFileWriter.HadoopPathBasedPendingFileRecoverable)) {
+ throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
+ }
+
+ HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
+ (HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
+ return new HadoopPathBasedPendingFile(
+ fileCommitterFactory.recoverForCommit(configuration, hadoopRecoverable.getTargetFilePath(),
+ hadoopRecoverable.getTempFilePath()),
+ hadoopRecoverable.getSize());
+ }
+
+ @Override
+ public WriterProperties getProperties() {
+ return new WriterProperties(UnsupportedInProgressFileRecoverableSerializable.INSTANCE,
+ HadoopPathBasedPendingFileRecoverableSerializer.INSTANCE, false);
+ }
+
+ @Override
+ public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(BucketID bucketID,
+ InProgressFileRecoverable inProgressFileSnapshot, long creationTime) {
+
+ throw new UnsupportedOperationException("Resume is not supported");
+ }
+
+ @Override
+ public boolean cleanupInProgressFileRecoverable(InProgressFileRecoverable inProgressFileRecoverable) {
+ return false;
+ }
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java
new file mode 100644
index 0000000..6d4816e
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.inlong.sort.hive.util.CacheHolder;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The Hadoop file committer that directly rename the in-progress file to the target file. For
+ * FileSystem like S3, renaming may lead to additional copies.
+ */
+public class HadoopRenameFileCommitter implements HadoopFileCommitter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopRenameFileCommitter.class);
+
+ private final Configuration configuration;
+
+ private Path targetFilePath;
+
+ private Path tempFilePath;
+
+ private boolean sinkMultipleEnable;
+
+ public HadoopRenameFileCommitter(Configuration configuration,
+ Path targetFilePath,
+ boolean sinkMultipleEnable)
+ throws IOException {
+ this.configuration = configuration;
+ this.targetFilePath = targetFilePath;
+ this.tempFilePath = generateTempFilePath();
+ this.sinkMultipleEnable = sinkMultipleEnable;
+ }
+
+ public HadoopRenameFileCommitter(Configuration configuration,
+ Path targetFilePath,
+ Path inProgressPath,
+ boolean sinkMultipleEnable) {
+ this.configuration = configuration;
+ this.targetFilePath = targetFilePath;
+ this.tempFilePath = inProgressPath;
+ this.sinkMultipleEnable = sinkMultipleEnable;
+ }
+
+ @Override
+ public Path getTargetFilePath() {
+ return targetFilePath;
+ }
+
+ @Override
+ public Path getTempFilePath() {
+ return tempFilePath;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public void preCommit() {
+ // Do nothing.
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (sinkMultipleEnable) {
+ commitMultiple(true);
+ } else {
+ rename(true);
+ }
+ }
+
+ @Override
+ public void commitAfterRecovery() throws IOException {
+ if (sinkMultipleEnable) {
+ commitMultiple(false);
+ } else {
+ rename(false);
+ }
+ }
+
+ private void commitMultiple(boolean assertFileExists) throws IOException {
+ LOG.info("file committer cache {}", CacheHolder.getFileCommitterHashMap());
+ Iterator<Path> iterator = CacheHolder.getFileCommitterHashMap().keySet().iterator();
+ while (iterator.hasNext()) {
+ Path path = iterator.next();
+ if (path.getName().equals(tempFilePath.getName())) {
+ HadoopRenameFileCommitter committer = CacheHolder.getFileCommitterHashMap().get(path);
+ committer.rename(assertFileExists);
+ iterator.remove();
+ }
+ }
+ }
+
+ private void rename(boolean assertFileExists) throws IOException {
+ FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);
+
+ if (!fileSystem.exists(tempFilePath)) {
+ if (assertFileExists) {
+ throw new IOException(
+ String.format("In progress file(%s) not exists.", tempFilePath));
+ } else {
+ // By pass the re-commit if source file not exists.
+ // TODO: in the future we may also need to check if the target file exists.
+ return;
+ }
+ }
+
+ try {
+ // If file exists, it will be overwritten.
+ fileSystem.rename(tempFilePath, targetFilePath);
+ } catch (IOException e) {
+ throw new IOException(
+ String.format(
+ "Could not commit file from %s to %s", tempFilePath, targetFilePath),
+ e);
+ }
+ }
+
+ private Path generateTempFilePath() throws IOException {
+ checkArgument(targetFilePath.isAbsolute(), "Target file must be absolute");
+
+ FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);
+
+ Path parent = targetFilePath.getParent();
+ String name = targetFilePath.getName();
+
+ while (true) {
+ Path candidate =
+ new Path(parent, "." + name + ".inprogress." + UUID.randomUUID());
+ if (!fileSystem.exists(candidate)) {
+ return candidate;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/InLongHadoopPathBasedBulkWriter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/InLongHadoopPathBasedBulkWriter.java
new file mode 100644
index 0000000..57dd7d5
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/InLongHadoopPathBasedBulkWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+public class InLongHadoopPathBasedBulkWriter implements HadoopPathBasedBulkWriter<RowData> {
+
+ private FileSinkOperator.RecordWriter recordWriter;
+ private Function<RowData, Writable> rowConverter;
+ private FileSystem fs;
+ private Path inProgressPath;
+
+ public InLongHadoopPathBasedBulkWriter(FileSinkOperator.RecordWriter recordWriter,
+ Function<RowData, Writable> rowConverter,
+ FileSystem fs,
+ Path inProgressPath) {
+ this.recordWriter = recordWriter;
+ this.rowConverter = rowConverter;
+ this.fs = fs;
+ this.inProgressPath = inProgressPath;
+ }
+
+ public void setRecordWriter(RecordWriter recordWriter) {
+ this.recordWriter = recordWriter;
+ }
+
+ public void setInProgressPath(Path inProgressPath) throws IOException {
+ this.inProgressPath = inProgressPath;
+ if (this.fs.getScheme().equals("file")) {
+ // update fs with hdfs scheme
+ this.fs = FileSystem.get(inProgressPath.toUri(), fs.getConf());
+ }
+ }
+
+ public void setRowConverter(Function<RowData, Writable> rowConverter) {
+ this.rowConverter = rowConverter;
+ }
+
+ @Override
+ public long getSize() throws IOException {
+ // it's possible the in-progress file hasn't yet been created, due to writer lazy
+ // init or data buffering
+ return fs.exists(inProgressPath) ? fs.getFileStatus(inProgressPath).getLen() : 0;
+ }
+
+ @Override
+ public void dispose() {
+ // close silently.
+ try {
+ recordWriter.close(true);
+ } catch (IOException ignored) {
+ }
+ }
+
+ @Override
+ public void addElement(RowData element) throws IOException {
+ recordWriter.write(rowConverter.apply(element));
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ @Override
+ public void finish() throws IOException {
+ recordWriter.close(false);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/PartitionCommitter.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/PartitionCommitter.java
new file mode 100644
index 0000000..cbd897f
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/PartitionCommitter.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.filesystem;
+
+import org.apache.inlong.sort.hive.HiveTableMetaStoreFactory;
+import org.apache.inlong.sort.hive.table.HiveTableInlongFactory;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
+import org.apache.flink.table.filesystem.EmptyMetaStoreFactory;
+import org.apache.flink.table.filesystem.FileSystemFactory;
+import org.apache.flink.table.filesystem.MetastoreCommitPolicy;
+import org.apache.flink.table.filesystem.PartitionCommitPolicy;
+import org.apache.flink.table.filesystem.SuccessFileCommitPolicy;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+import org.apache.flink.table.filesystem.stream.PartitionCommitTrigger;
+import org.apache.flink.table.filesystem.stream.TaskTracker;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
+import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+
+/**
+ * Committer operator for partitions. This is the single (non-parallel) task. It collects all the
+ * partition information sent from upstream, and triggers the partition submission decision when it
+ * judges to collect the partitions from all tasks of a checkpoint.
+ *
+ * <p>NOTE: It processes records after the checkpoint completes successfully. Receive records from
+ * upstream {@link CheckpointListener#notifyCheckpointComplete}.
+ *
+ * <p>Processing steps: 1.Partitions are sent from upstream. Add partition to trigger. 2.{@link
+ * TaskTracker} say it have already received partition data from all tasks in a checkpoint.
+ * 3.Extracting committable partitions from {@link PartitionCommitTrigger}. 4.Using {@link
+ * PartitionCommitPolicy} chain to commit partitions.
+ */
+public class PartitionCommitter extends AbstractStreamOperator<Void>
+ implements
+ OneInputStreamOperator<PartitionCommitInfo, Void> {
+
+ /**
+ * hdfs files which modified less than HDFS_FILE_MODIFIED_THRESHOLD will be committed partitions
+ */
+ private static final long HDFS_FILES_MODIFIED_THRESHOLD = 5 * 60 * 1000L;
+
+ private static final long serialVersionUID = 1L;
+
+ private final Configuration conf;
+
+ private Path locationPath;
+
+ private final ObjectIdentifier tableIdentifier;
+
+ private final List<String> partitionKeys;
+
+ private final TableMetaStoreFactory metaStoreFactory;
+
+ private final FileSystemFactory fsFactory;
+
+ private transient PartitionCommitTrigger trigger;
+
+ private transient TaskTracker taskTracker;
+
+ private transient long currentWatermark;
+
+ private transient List<PartitionCommitPolicy> policies;
+
+ private final String hiveVersion;
+
+ private final boolean sinkMultipleEnable;
+
+ public PartitionCommitter(
+ Path locationPath,
+ ObjectIdentifier tableIdentifier,
+ List<String> partitionKeys,
+ TableMetaStoreFactory metaStoreFactory,
+ FileSystemFactory fsFactory,
+ Configuration conf) {
+ this.locationPath = locationPath;
+ this.tableIdentifier = tableIdentifier;
+ this.partitionKeys = partitionKeys;
+ this.metaStoreFactory = metaStoreFactory;
+ this.fsFactory = fsFactory;
+ this.conf = conf;
+ PartitionCommitPolicy.validatePolicyChain(
+ metaStoreFactory instanceof EmptyMetaStoreFactory,
+ conf.get(SINK_PARTITION_COMMIT_POLICY_KIND));
+ this.hiveVersion = conf.get(HiveCatalogFactoryOptions.HIVE_VERSION);
+ this.sinkMultipleEnable = conf.get(SINK_MULTIPLE_ENABLE);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+ this.currentWatermark = Long.MIN_VALUE;
+ this.trigger =
+ PartitionCommitTrigger.create(
+ context.isRestored(),
+ context.getOperatorStateStore(),
+ conf,
+ getUserCodeClassloader(),
+ partitionKeys,
+ getProcessingTimeService());
+ this.policies =
+ PartitionCommitPolicy.createPolicyChain(
+ getUserCodeClassloader(),
+ conf.get(SINK_PARTITION_COMMIT_POLICY_KIND),
+ conf.get(SINK_PARTITION_COMMIT_POLICY_CLASS),
+ conf.get(SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
+ () -> {
+ try {
+ return fsFactory.create(locationPath.toUri());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public void processElement(StreamRecord<PartitionCommitInfo> element) throws Exception {
+ PartitionCommitInfo message = element.getValue();
+ for (String partition : message.getPartitions()) {
+ trigger.addPartition(partition);
+ }
+
+ if (taskTracker == null) {
+ taskTracker = new TaskTracker(message.getNumberOfTasks());
+ }
+ boolean needCommit = taskTracker.add(message.getCheckpointId(), message.getTaskId());
+ if (needCommit) {
+ commitPartitions(message.getCheckpointId());
+ }
+ }
+
+ private void commitPartitions(long checkpointId) throws Exception {
+ List<String> partitions =
+ checkpointId == Long.MAX_VALUE ? trigger.endInput() : trigger.committablePartitions(checkpointId);
+ if (partitions.isEmpty()) {
+ return;
+ }
+
+ if (sinkMultipleEnable) {
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ HiveConf hiveConf = HiveTableInlongFactory.getHiveConf();
+ try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion)) {
+ // we do not know which tables need to commit, so list all tables.
+ // if there are many dbs and tables, performance bottleneck may occur.
+ List<String> dbs = client.getAllDatabases().stream().filter(db -> !db.equalsIgnoreCase("default"))
+ .collect(Collectors.toList());
+ // hdfs path directory where hive db locates
+ Path dbPath;
+ List<String> partitionColumns = new ArrayList<>();
+ // HashMap<String, Table> tableCache = new HashMap<>(16);
+ FileSystem fs;
+ for (String db : dbs) {
+ Database database = client.getDatabase(db);
+ dbPath = new Path(database.getLocationUri());
+ try {
+ List<String> tables = client.getAllTables(db);
+ for (String tableName : tables) {
+ if (partitionColumns.size() == 0) {
+ Table table = client.getTable(db, tableName);
+ // tableCache.put(db + "." + tableName, table);
+ // all tables must have same name partition field
+ partitionColumns = table.getPartitionKeys().stream().map(FieldSchema::getName)
+ .collect(Collectors.toList());
+ if (partitionColumns.size() == 0) {
+ continue;
+ }
+ }
+
+ // reset partitionKeys for MetastoreCommitPolicy
+ partitionKeys.clear();
+ partitionKeys.addAll(partitionColumns);
+
+ for (String partition : partitions) {
+ LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(
+ new Path(partition));
+ // All tables' hdfs files must be at the same path like
+ // hdfs://0.0.0.0:9020/user/hive/warehouse.
+ // Or the table will not be committed partition
+ Path tablePath = new Path(dbPath, tableName);
+ Path partitionPath = new Path(tablePath, generatePartitionPath(partSpec));
+
+ fs = fsFactory.create(partitionPath.toUri());
+ if (!fs.exists(partitionPath)) {
+ // partition path not exist
+ continue;
+ // if (!fs.exists(tablePath)) {
+ // LOG.debug("Table path {} not exist, load table from metastore",
+ // partitionPath.getParent());
+ // // table location maybe at other storage
+ // Table table = tableCache.get(db + "." + tableName);
+ // if (table == null) {
+ // table = client.getTable(db, tableName);
+ // tableCache.put(db + "." + tableName, table);
+ // }
+ // partitionPath = new Path(table.getSd().getLocation(),
+ // generatePartitionPath(partSpec));
+ // fs = fsFactory.create(partitionPath.toUri());
+ // if (!fs.exists(partitionPath)) {
+ // // partition path not exist
+ // continue;
+ // }
+ // // reset partitionKeys for MetastoreCommitPolicy partitionKeys.clear();
+ // partitionKeys.addAll(table.getPartitionKeys().stream().map(FieldSchema::getName)
+ // .collect(Collectors.toList()));
+ // }
+ }
+
+ if (stopWatch.getStartTime()
+ - fs.getFileStatus(partitionPath)
+ .getModificationTime() > HDFS_FILES_MODIFIED_THRESHOLD) {
+ // last modified more than HDFS_FILE_MODIFIED_THRESHOLD millis, no need to commit
+ continue;
+ }
+
+ // reset locationPath for MetastoreCommitPolicy
+ locationPath = partitionPath;
+
+ LOG.info("Partition {} of table {}.{} is ready to be committed", partSpec, db,
+ tableName);
+
+ HiveTableMetaStoreFactory factory = new HiveTableMetaStoreFactory(
+ new JobConf(HiveTableInlongFactory.getHiveConf()), hiveVersion, db, tableName);
+ try (TableMetaStoreFactory.TableMetaStore metaStore = factory.createTableMetaStore()) {
+ PartitionCommitPolicy.Context context = new CommitPolicyContextImpl(
+ new ArrayList<>(partSpec.values()), partitionPath);
+ for (PartitionCommitPolicy policy : policies) {
+ if (policy instanceof MetastoreCommitPolicy) {
+ ((MetastoreCommitPolicy) policy).setMetastore(metaStore);
+ } else if (policy instanceof SuccessFileCommitPolicy) {
+ String successFileName = conf.get(SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME);
+ policy = new SuccessFileCommitPolicy(successFileName, fs);
+ }
+ policy.commit(context);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Commit partition occurs error", e);
+ break;
+ }
+ }
+ }
+ stopWatch.stop();
+ LOG.info("Commit partitions spend {}ms", stopWatch.getTime());
+ } else {
+ try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {
+ for (String partition : partitions) {
+ LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));
+ LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);
+ Path path = new Path(locationPath, generatePartitionPath(partSpec));
+ PartitionCommitPolicy.Context context = new CommitPolicyContextImpl(
+ new ArrayList<>(partSpec.values()), path);
+ for (PartitionCommitPolicy policy : policies) {
+ if (policy instanceof MetastoreCommitPolicy) {
+ ((MetastoreCommitPolicy) policy).setMetastore(metaStore);
+ }
+ policy.commit(context);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ this.currentWatermark = mark.getTimestamp();
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ trigger.snapshotState(context.getCheckpointId(), currentWatermark);
+ }
+
+ private class CommitPolicyContextImpl implements PartitionCommitPolicy.Context {
+
+ private final List<String> partitionValues;
+ private final Path partitionPath;
+
+ private CommitPolicyContextImpl(List<String> partitionValues, Path partitionPath) {
+ this.partitionValues = partitionValues;
+ this.partitionPath = partitionPath;
+ }
+
+ @Override
+ public String catalogName() {
+ return tableIdentifier.getCatalogName();
+ }
+
+ @Override
+ public String databaseName() {
+ return tableIdentifier.getDatabaseName();
+ }
+
+ @Override
+ public String tableName() {
+ return tableIdentifier.getObjectName();
+ }
+
+ @Override
+ public List<String> partitionKeys() {
+ return partitionKeys;
+ }
+
+ @Override
+ public List<String> partitionValues() {
+ return partitionValues;
+ }
+
+ @Override
+ public Path partitionPath() {
+ return partitionPath;
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
index 447b4bb..d90d463 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
@@ -35,7 +35,6 @@
import org.apache.flink.table.filesystem.FileSystemFactory;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
-import org.apache.flink.table.filesystem.stream.PartitionCommitter;
import org.apache.flink.table.filesystem.stream.compact.CompactBucketWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactCoordinator;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput;
@@ -152,10 +151,9 @@
FileSystemFactory fsFactory,
Configuration options) {
DataStream<?> stream = writer;
- if (partitionKeys.size() > 0 && options.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
- PartitionCommitter committer =
- new PartitionCommitter(
- locationPath, identifier, partitionKeys, msFactory, fsFactory, options);
+ if (options.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
+ PartitionCommitter committer = new PartitionCommitter(locationPath, identifier, partitionKeys, msFactory,
+ fsFactory, options);
stream =
writer.transform(
PartitionCommitter.class.getSimpleName(), Types.VOID, committer)
@@ -165,4 +163,5 @@
return stream.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}
+
}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
index 8a8bf15..d846334 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
@@ -20,6 +20,8 @@
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.hive.HiveTableSink;
import com.google.common.base.Preconditions;
@@ -50,27 +52,30 @@
import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_CONF_DIR;
import static org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_VERSION;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_ENABLE;
import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_PARTITION_POLICY;
+import static org.apache.inlong.sort.base.Constants.SOURCE_PARTITION_FIELD_NAME;
import static org.apache.inlong.sort.hive.HiveOptions.HIVE_DATABASE;
+import static org.apache.inlong.sort.hive.HiveOptions.HIVE_STORAGE_INPUT_FORMAT;
+import static org.apache.inlong.sort.hive.HiveOptions.HIVE_STORAGE_OUTPUT_FORMAT;
+import static org.apache.inlong.sort.hive.HiveOptions.HIVE_STORAGE_SERIALIZATION_LIB;
/**
* DynamicTableSourceFactory for hive table source
*/
public class HiveTableInlongFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
- private final HiveConf hiveConf;
-
- public HiveTableInlongFactory() {
- this.hiveConf = new HiveConf();
- }
-
- public HiveTableInlongFactory(HiveConf hiveConf) {
- this.hiveConf = hiveConf;
- }
+ private static final HiveConf hiveConf = new HiveConf();
@Override
public String factoryIdentifier() {
@@ -94,26 +99,39 @@
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
options.add(AUDIT_KEYS);
+ options.add(SINK_MULTIPLE_ENABLE);
+ options.add(SINK_MULTIPLE_DATABASE_PATTERN);
+ options.add(SINK_MULTIPLE_TABLE_PATTERN);
+ options.add(SINK_MULTIPLE_FORMAT);
+ options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+ options.add(SOURCE_PARTITION_FIELD_NAME);
return options;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());
Map<String, String> options = context.getCatalogTable().getOptions();
// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (isHiveTable) {
updateHiveConf(options);
// new HiveValidator().validate(properties);
- Integer configuredParallelism =
- Configuration.fromMap(context.getCatalogTable().getOptions())
- .get(FileSystemOptions.SINK_PARALLELISM);
- final String inlongMetric = context.getCatalogTable().getOptions()
- .getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue());
- final String auditHostAndPorts = context.getCatalogTable().getOptions()
- .getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue());
+ Integer configuredParallelism = helper.getOptions().get(FileSystemOptions.SINK_PARALLELISM);
+ final String inlongMetric = helper.getOptions().get(INLONG_METRIC);
+ final String auditHostAndPorts = helper.getOptions().get(INLONG_AUDIT);
final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(Configuration.fromMap(options));
final DirtySink<Object> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy = helper.getOptions()
+ .get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+ PartitionPolicy partitionPolicy = helper.getOptions().get(SINK_PARTITION_POLICY);
+ String partitionField = helper.getOptions().get(SOURCE_PARTITION_FIELD_NAME);
+ String timestampPattern = helper.getOptions().getOptional(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN)
+ .orElse("yyyy-MM-dd");
+ boolean sinkMultipleEnable = helper.getOptions().get(SINK_MULTIPLE_ENABLE);
+ String inputFormat = helper.getOptions().get(HIVE_STORAGE_INPUT_FORMAT);
+ String outputFormat = helper.getOptions().get(HIVE_STORAGE_OUTPUT_FORMAT);
+ String serializationLib = helper.getOptions().get(HIVE_STORAGE_SERIALIZATION_LIB);
return new HiveTableSink(
context.getConfiguration(),
new JobConf(hiveConf),
@@ -123,7 +141,15 @@
inlongMetric,
auditHostAndPorts,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ schemaUpdatePolicy,
+ partitionPolicy,
+ partitionField,
+ timestampPattern,
+ sinkMultipleEnable,
+ inputFormat,
+ outputFormat,
+ serializationLib);
} else {
return FactoryUtil.createTableSink(
null, // we already in the factory of catalog
@@ -194,4 +220,8 @@
hiveConf.set(entry.getKey(), entry.getValue());
}
}
+
+ public static HiveConf getHiveConf() {
+ return hiveConf;
+ }
}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/CacheHolder.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/CacheHolder.java
new file mode 100644
index 0000000..ceb9c8b
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/CacheHolder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.util;
+
+import org.apache.inlong.sort.hive.HiveWriterFactory;
+import org.apache.inlong.sort.hive.filesystem.HadoopRenameFileCommitter;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+
+import java.util.HashMap;
+import java.util.function.Function;
+
+public class CacheHolder {
+
+ /**
+ * hive writer factory cache
+ */
+ private static final HashMap<ObjectIdentifier, HiveWriterFactory> factoryMap = new HashMap<>(16);
+ /**
+ * hive record writer cache
+ */
+ private static final HashMap<Path, FileSinkOperator.RecordWriter> recordWriterHashMap = new HashMap<>(16);
+ /**
+ * hive hdfs file committer cache
+ */
+ private static final HashMap<Path, HadoopRenameFileCommitter> fileCommitterHashMap = new HashMap<>(16);
+ /**
+ * hive row converter cache
+ */
+ private static final HashMap<Path, Function<RowData, Writable>> rowConverterHashMap = new HashMap<>(16);
+ /**
+ * hive schema check time cache
+ */
+ private static final HashMap<ObjectIdentifier, Long> schemaCheckTimeMap = new HashMap<>(16);
+ /**
+ * ignore writing hive table after exception as schema policy is STOP_PARTIAL
+ */
+ private static final HashMap<ObjectIdentifier, Long> ignoreWritingTableMap = new HashMap<>(16);
+
+ public static HashMap<Path, RecordWriter> getRecordWriterHashMap() {
+ return recordWriterHashMap;
+ }
+
+ public static HashMap<Path, Function<RowData, Writable>> getRowConverterHashMap() {
+ return rowConverterHashMap;
+ }
+
+ public static HashMap<ObjectIdentifier, HiveWriterFactory> getFactoryMap() {
+ return factoryMap;
+ }
+
+ public static HashMap<Path, HadoopRenameFileCommitter> getFileCommitterHashMap() {
+ return fileCommitterHashMap;
+ }
+
+ public static HashMap<ObjectIdentifier, Long> getSchemaCheckTimeMap() {
+ return schemaCheckTimeMap;
+ }
+
+ public static HashMap<ObjectIdentifier, Long> getIgnoreWritingTableMap() {
+ return ignoreWritingTableMap;
+ }
+
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/HiveTableUtil.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/HiveTableUtil.java
new file mode 100644
index 0000000..188a8dc
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/util/HiveTableUtil.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hive.util;
+
+import org.apache.inlong.sort.base.sink.PartitionPolicy;
+import org.apache.inlong.sort.hive.HiveWriterFactory;
+import org.apache.inlong.sort.hive.table.HiveTableInlongFactory;
+
+import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.inlong.sort.hive.HiveOptions.SINK_PARTITION_NAME;
+
+/**
+ * Utility class for list or create hive table
+ */
+public class HiveTableUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveTableUtil.class);
+ private static final String DEFAULT_PARTITION_DATE_FIELD = "[(create)(update)].*[(date)(time)]";
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public static boolean changeSchema(RowType schema,
+ String[] hiveColumns,
+ DataType[] hiveTypes,
+ String databaseName,
+ String tableName,
+ String hiveVersion) {
+ boolean changed = false;
+
+ Map<String, LogicalType> flinkTypeMap = new HashMap<>();
+ for (RowField field : schema.getFields()) {
+ // lowercase field name as Oracle field name is uppercase
+ flinkTypeMap.put(field.getName().toLowerCase(), field.getType());
+ }
+
+ List<String> columnsFromData = new ArrayList<>(flinkTypeMap.keySet());
+ List<String> columnsFromHive = Arrays.asList(hiveColumns);
+ columnsFromData.removeAll(columnsFromHive);
+ // add new field
+ for (String fieldName : columnsFromData) {
+ boolean result = alterTable(databaseName, tableName, fieldName, flinkTypeMap.get(fieldName), hiveVersion);
+ if (result) {
+ changed = true;
+ }
+ }
+ // modify field type
+ for (int i = 0; i < hiveColumns.length; i++) {
+ if (!flinkTypeMap.containsKey(hiveColumns[i])) {
+ // hive table has field which source table does not have, ignore it
+ // so not support drop field dynamic now
+ continue;
+ }
+ LogicalType flinkType = flinkTypeMap.get(hiveColumns[i]);
+ if (hiveTypes[i].getLogicalType().getTypeRoot() != flinkType.getTypeRoot()) {
+ // field type changed
+ boolean result = alterTable(databaseName, tableName, hiveColumns[i], flinkType, hiveVersion);
+ if (result) {
+ changed = true;
+ }
+ }
+ }
+ return changed;
+ }
+
+ public static boolean alterTable(String databaseName, String tableName, String fieldName, LogicalType type,
+ String hiveVersion) {
+ HiveConf hiveConf = HiveTableInlongFactory.getHiveConf();
+ try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion)) {
+ FieldSchema schema = new FieldSchema(fieldName, flinkType2HiveType(type), "");
+
+ Table table = client.getTable(databaseName, tableName);
+ ObjectIdentifier identifier = createObjectIdentifier(databaseName, tableName);
+ List<FieldSchema> fieldSchemaList = getTableFields(client, identifier);
+ // remove partition keys
+ fieldSchemaList.removeAll(table.getPartitionKeys());
+ boolean alter = false;
+ boolean exist = false;
+ for (FieldSchema fieldSchema : fieldSchemaList) {
+ if (fieldSchema.getName().equals(fieldName)) {
+ exist = true;
+ if (!flinkType2HiveType(type).equalsIgnoreCase(fieldSchema.getType())) {
+ LOG.info("table {}.{} field {} change type from {} to {}", databaseName, tableName, fieldName,
+ fieldSchema.getType(), flinkType2HiveType(type));
+ fieldSchema.setType(flinkType2HiveType(type));
+ alter = true;
+ }
+ break;
+ }
+ }
+ if (!exist) {
+ LOG.info("table {}.{} add new field {} with type {}", databaseName, tableName, fieldName,
+ flinkType2HiveType(type));
+ fieldSchemaList.add(schema);
+ alter = true;
+ }
+ if (alter) {
+ table.getSd().setCols(fieldSchemaList);
+ IMetaStoreClient metaStoreClient = getMetaStoreClient(client);
+ EnvironmentContext environmentContext = new EnvironmentContext();
+ environmentContext.putToProperties("CASCADE", "true");
+ metaStoreClient.alter_table_with_environmentContext(databaseName, tableName, table, environmentContext);
+ LOG.info("alter table {}.{} success", databaseName, tableName);
+ return true;
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to alter hive table %s.%s", databaseName, tableName), e);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
+
+ /**
+ * create hive table with default `pt` partition field
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ * @param schema flink field type
+ * @param partitionPolicy policy of partitioning table
+ * @param hiveVersion hive version
+ * @param inputFormat the input format of storage descriptor
+ * @param outputFormat the output format of storage descriptor
+ * @param serializationLib the serialization library of storage descriptor
+ */
+ public static void createTable(String databaseName, String tableName, RowType schema,
+ PartitionPolicy partitionPolicy, String hiveVersion, String inputFormat, String outputFormat,
+ String serializationLib) {
+ HiveConf hiveConf = HiveTableInlongFactory.getHiveConf();
+ try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion)) {
+
+ List<String> dbs = client.getAllDatabases();
+ boolean dbExist = dbs.stream().anyMatch(databaseName::equalsIgnoreCase);
+ // create database if not exists
+ if (!dbExist) {
+ Database defaultDb = client.getDatabase("default");
+ Database targetDb = new Database();
+ targetDb.setName(databaseName);
+ targetDb.setLocationUri(defaultDb.getLocationUri() + "/" + databaseName + ".db");
+ client.createDatabase(targetDb);
+ }
+
+ String sinkPartitionName = hiveConf.get(SINK_PARTITION_NAME.key(), SINK_PARTITION_NAME.defaultValue());
+ FieldSchema defaultPartition = new FieldSchema(sinkPartitionName, "string", "");
+
+ List<FieldSchema> fieldSchemaList = new ArrayList<>();
+ for (RowField field : schema.getFields()) {
+ if (!field.getName().equals(sinkPartitionName)) {
+ FieldSchema hiveFieldSchema = new FieldSchema(field.getName(), flinkType2HiveType(field.getType()),
+ "");
+ fieldSchemaList.add(hiveFieldSchema);
+ }
+ }
+
+ Table table = new Table();
+ table.setDbName(databaseName);
+ table.setTableName(tableName);
+
+ if (PartitionPolicy.NONE != partitionPolicy) {
+ table.setPartitionKeys(Collections.singletonList(defaultPartition));
+ }
+
+ StorageDescriptor sd = new StorageDescriptor();
+ table.setSd(sd);
+ sd.setCols(fieldSchemaList);
+ sd.setInputFormat(inputFormat);
+ sd.setOutputFormat(outputFormat);
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setSerializationLib(serializationLib);
+ client.createTable(table);
+ LOG.info("create table {}.{}", databaseName, tableName);
+ } catch (TException e) {
+ throw new CatalogException("Failed to create database", e);
+ }
+ }
+
+ /**
+ * Cache hive wrtier factory
+ *
+ * @param hiveShim hiveShim object
+ * @param hiveVersion hive version
+ * @param identifier object identifier
+ * @return hive writer factory
+ */
+ public static HiveWriterFactory getWriterFactory(HiveShim hiveShim, String hiveVersion,
+ ObjectIdentifier identifier) {
+ if (!CacheHolder.getFactoryMap().containsKey(identifier)) {
+ HiveConf hiveConf = HiveTableInlongFactory.getHiveConf();
+ try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion)) {
+ List<String> tableNames = client.getAllTables(identifier.getDatabaseName());
+ LOG.info("table names: {}", Arrays.deepToString(tableNames.toArray()));
+ boolean tableExist = tableNames.stream().anyMatch(identifier.getObjectName()::equalsIgnoreCase);
+ if (!tableExist) {
+ return null;
+ }
+
+ Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
+ StorageDescriptor sd = table.getSd();
+
+ List<FieldSchema> fieldSchemaList = getTableFields(client, identifier);
+
+ List<FieldSchema> partitionSchemas = table.getPartitionKeys();
+ String[] partitions = partitionSchemas.stream().map(FieldSchema::getName).toArray(String[]::new);
+
+ TableSchema.Builder builder = new TableSchema.Builder();
+ for (FieldSchema fieldSchema : fieldSchemaList) {
+ LogicalType logicalType = LogicalTypeParser.parse(fieldSchema.getType());
+ if (logicalType instanceof TimestampType) {
+ TimestampType timestampType = (TimestampType) logicalType;
+ logicalType = new TimestampType(timestampType.isNullable(), timestampType.getKind(), 9);
+ } else if (logicalType instanceof ZonedTimestampType) {
+ ZonedTimestampType timestampType = (ZonedTimestampType) logicalType;
+ logicalType = new ZonedTimestampType(timestampType.isNullable(), timestampType.getKind(), 9);
+ }
+ builder.field(fieldSchema.getName(), new AtomicDataType(logicalType));
+ }
+ TableSchema schema = builder.build();
+
+ Class hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
+ boolean isCompressed = hiveConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
+ HiveWriterFactory writerFactory = new HiveWriterFactory(new JobConf(hiveConf), hiveOutputFormatClz, sd,
+ schema, partitions, HiveReflectionUtils.getTableMetadata(hiveShim, table), hiveShim,
+ isCompressed, true);
+ CacheHolder.getFactoryMap().put(identifier, writerFactory);
+ } catch (TException e) {
+ throw new CatalogException("Failed to query Hive metaStore", e);
+ } catch (ClassNotFoundException e) {
+ throw new FlinkHiveException("Failed to get output format class", e);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new FlinkHiveException("Failed to get hive metastore client", e);
+ }
+ }
+ return CacheHolder.getFactoryMap().get(identifier);
+ }
+
+ /**
+ * get hive metastore client
+ *
+ * @param client hive metastore client
+ * @return hive metastore client
+ */
+ public static IMetaStoreClient getMetaStoreClient(HiveMetastoreClientWrapper client)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field clientField = HiveMetastoreClientWrapper.class.getDeclaredField("client");
+ clientField.setAccessible(true);
+ return (IMetaStoreClient) clientField.get(client);
+ }
+
+ /**
+ * get hive table fields schema
+ *
+ * @param client hive metastore client
+ * @param identifier hive database and table name
+ * @return hive field schema
+ */
+ public static List<FieldSchema> getTableFields(HiveMetastoreClientWrapper client, ObjectIdentifier identifier)
+ throws TException, NoSuchFieldException, IllegalAccessException {
+ IMetaStoreClient metaStoreClient = getMetaStoreClient(client);
+ return metaStoreClient.getSchema(identifier.getDatabaseName(), identifier.getObjectName());
+ }
+
+ /**
+ * if raw data has field like create_date or create_time, give it to default partition field.
+ * or give first timestamp or date type field value to default partition field
+ *
+ * @param rawData row data
+ * @param schema flink field types
+ * @param partitionPolicy partition policy
+ * @param partitionField partition field
+ * @param timestampPattern timestamp pattern
+ * @return default partition value
+ */
+ public static Object getDefaultPartitionValue(Map<String, Object> rawData, RowType schema,
+ PartitionPolicy partitionPolicy, String partitionField, String timestampPattern) {
+ if (PartitionPolicy.NONE == partitionPolicy) {
+ return null;
+ }
+
+ if (PartitionPolicy.PROC_TIME == partitionPolicy) {
+ return DateTimeFormatter.ofPattern(timestampPattern).format(LocalDateTime.now());
+ }
+
+ String defaultPartitionFieldName = DEFAULT_PARTITION_DATE_FIELD;
+ if (PartitionPolicy.ASSIGN_FIELD == partitionPolicy) {
+ defaultPartitionFieldName = partitionField;
+ }
+ Pattern pattern = Pattern.compile(Pattern.quote(defaultPartitionFieldName), Pattern.CASE_INSENSITIVE);
+ for (RowField field : schema.getFields()) {
+ LogicalTypeRoot type = field.getType().getTypeRoot();
+ if (type == TIMESTAMP_WITH_LOCAL_TIME_ZONE || type == TIMESTAMP_WITH_TIME_ZONE
+ || type == TIMESTAMP_WITHOUT_TIME_ZONE || type == DATE) {
+ if (pattern.matcher(field.getName()).matches()) {
+ String value = (String) rawData.get(field.getName());
+ return formatDate(value, timestampPattern);
+ }
+ }
+ }
+ if (PartitionPolicy.SOURCE_DATE_FIELD == partitionPolicy) {
+ // no create or update time field, return proc time
+ return DateTimeFormatter.ofPattern(timestampPattern).format(LocalDateTime.now());
+ }
+ return null;
+ }
+
+ public static String formatDate(String dateStr, String toPattern) {
+ if (StringUtils.isBlank(dateStr)) {
+ return null;
+ }
+ LocalDateTime localDateTime = parseDate(dateStr);
+ if (localDateTime == null) {
+ return null;
+ }
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern(toPattern);
+ return formatter.format(localDateTime);
+ }
+
+ public static LocalDateTime parseDate(String dateStr) {
+ if (StringUtils.isBlank(dateStr)) {
+ return null;
+ }
+ ZonedDateTime zonedDateTime = null;
+ try {
+ zonedDateTime = ZonedDateTime.parse(dateStr,
+ DateTimeFormatter.ofPattern("yyyy-MM-dd['T'][' ']HH:mm:ss[.SSS][XXX][Z]"));
+ return LocalDateTime.ofInstant(zonedDateTime.toInstant(), ZoneId.systemDefault());
+ } catch (DateTimeParseException ignored) {
+ String[] patterns = new String[]{"yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd", "yyyyMMdd"};
+ for (String pattern : patterns) {
+ try {
+ return LocalDateTime.parse(dateStr, DateTimeFormatter.ofPattern(pattern));
+ } catch (DateTimeParseException ignored2) {
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * flink type to hive type
+ *
+ * @param type flink field type
+ * @return hive type
+ */
+ public static String flinkType2HiveType(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case INTEGER:
+ return "int";
+ case BIGINT:
+ return "bigint";
+ case TINYINT:
+ return "tinyint";
+ case SMALLINT:
+ return "smallint";
+ case FLOAT:
+ return "float";
+ case DOUBLE:
+ return "double";
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ return String.format("decimal(%s,%s)", decimalType.getPrecision(), decimalType.getScale());
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return "timestamp";
+ case DATE:
+ return "date";
+ case BOOLEAN:
+ return "boolean";
+ case BINARY:
+ case VARBINARY:
+ return "binary";
+ case ARRAY:
+ return "array";
+ case CHAR:
+ case VARCHAR:
+ default:
+ return "string";
+ }
+ }
+
+ /**
+ * convert json data to flink generic row data
+ *
+ * @param record json data
+ * @param allColumns hive column names
+ * @param allTypes hive column types
+ * @param replaceLineBreak if replace line break to blank to avoid data corrupt when hive text table
+ * @return generic row data and byte size of the data
+ */
+ public static Pair<GenericRowData, Integer> getRowData(Map<String, Object> record, String[] allColumns,
+ DataType[] allTypes, boolean replaceLineBreak) {
+ GenericRowData genericRowData = new GenericRowData(RowKind.INSERT, allColumns.length);
+ int byteSize = 0;
+ for (int index = 0; index < allColumns.length; index++) {
+ String columnName = allColumns[index];
+ LogicalType logicalType = allTypes[index].getLogicalType();
+ LogicalTypeRoot typeRoot = logicalType.getTypeRoot();
+ Object raw = record.get(columnName);
+ byteSize += raw == null ? 0 : String.valueOf(raw).getBytes(StandardCharsets.UTF_8).length;
+ switch (typeRoot) {
+ case BOOLEAN:
+ genericRowData.setField(index, raw != null ? Boolean.parseBoolean(String.valueOf(raw)) : null);
+ break;
+ case VARBINARY:
+ case BINARY:
+ byte[] bytes = null;
+ if (raw instanceof byte[]) {
+ bytes = (byte[]) raw;
+ } else if (raw instanceof String) {
+ bytes = ((String) raw).getBytes(StandardCharsets.UTF_8);
+ }
+ genericRowData.setField(index, bytes);
+ break;
+ case DECIMAL:
+ genericRowData.setField(index, raw != null ? new BigDecimal(String.valueOf(raw)) : null);
+ break;
+ case DOUBLE:
+ genericRowData.setField(index, raw != null ? Double.valueOf(String.valueOf(raw)) : null);
+ break;
+ case FLOAT:
+ genericRowData.setField(index, raw != null ? Float.valueOf(String.valueOf(raw)) : null);
+ break;
+ case INTEGER:
+ genericRowData.setField(index, raw != null ? Integer.valueOf(String.valueOf(raw)) : null);
+ break;
+ case BIGINT:
+ genericRowData.setField(index, raw != null ? Long.valueOf(String.valueOf(raw)) : null);
+ break;
+ case TINYINT:
+ genericRowData.setField(index, raw != null ? Short.valueOf(String.valueOf(raw)) : null);
+ break;
+ case CHAR:
+ case VARCHAR:
+ String value = null;
+ if (raw != null) {
+ value = String.valueOf(record.get(columnName));
+ if (replaceLineBreak) {
+ value = value.replaceAll("[\r\n]", " ");
+ }
+ }
+ genericRowData.setField(index, value);
+ break;
+ case DATE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ if (raw instanceof String) {
+ genericRowData.setField(index, parseDate((String) raw));
+ } else if (raw instanceof Long) {
+ genericRowData.setField(index, new Date((Long) raw));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ return new ImmutablePair<>(genericRowData, byteSize);
+ }
+
+ /**
+ * Convert json node to hash map.
+ * Lowercase the keys of map, because Oracle cdc sends record with uppercase field name, but hive table only
+ * supports lowercase field name
+ *
+ * @param data json node
+ * @return list of map data
+ */
+ public static List<Map<String, Object>> jsonNode2Map(JsonNode data) {
+ if (data == null) {
+ return new ArrayList<>();
+ }
+ List<Map<String, Object>> values = new ArrayList<>();
+ if (data.isArray()) {
+ for (int i = 0; i < data.size(); i++) {
+ values.add(jsonObject2Map(data.get(0)));
+ }
+ } else {
+ values.add(jsonObject2Map(data));
+ }
+ return values;
+ }
+
+ /**
+ * convert json object, such as JsonNode, JsonObject, to map
+ * @param data json object, such as JsonNode, JsonObject
+ * @return Map data
+ */
+ public static Map<String, Object> jsonObject2Map(Object data) {
+ CaseInsensitiveMap map = new CaseInsensitiveMap();
+ map.putAll(objectMapper.convertValue(data, new TypeReference<Map<String, Object>>() {
+ }));
+ return map;
+ }
+
+ /**
+ * convert map to JsonNode
+ *
+ * @param data map data
+ * @return json node
+ */
+ public static JsonNode object2JsonNode(Map<String, Object> data) {
+ try {
+ return objectMapper.readTree(objectMapper.writeValueAsString(data));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * create object identifier
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ * @return
+ */
+ public static ObjectIdentifier createObjectIdentifier(String databaseName, String tableName) {
+ return ObjectIdentifier.of("default_catalog", databaseName, tableName);
+ }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 44bb810..1543e8d 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -469,6 +469,14 @@
inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveOptions.java
inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HadoopFileSystemFactory.java
inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
+ inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/DefaultHadoopFileCommitterFactory.java
+ inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedBulkFormatBuilder.java
+ inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopPathBasedPartFileWriter.java
+ inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java
+ inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/filesystem/PartitionCommitter.java
+ inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveBulkWriterFactory.java
+ inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java
+ inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveWriterFactory.java
Source : flink-connector-hive 1.13.5 (Please note that the software have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE